[SCB-2662]trigger pull instance event when instance isolated (#3276)

diff --git a/demo/demo-jaxrs/jaxrs-client/pom.xml b/demo/demo-jaxrs/jaxrs-client/pom.xml
index b1614fc..5acbb0b 100644
--- a/demo/demo-jaxrs/jaxrs-client/pom.xml
+++ b/demo/demo-jaxrs/jaxrs-client/pom.xml
@@ -45,6 +45,10 @@
       <groupId>org.apache.servicecomb</groupId>
       <artifactId>provider-pojo</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.servicecomb</groupId>
+      <artifactId>handler-governance</artifactId>
+    </dependency>
     <!-- can be added in local test -->
     <!-- This jar will add an environment version={project.version}, and may cause spring boot application
        startup fail -->
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/TestCodeFirstJaxrs.java b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/TestCodeFirstJaxrs.java
index 66d0729..4e6cbd9 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/TestCodeFirstJaxrs.java
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/TestCodeFirstJaxrs.java
@@ -20,6 +20,7 @@
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.servicecomb.core.provider.consumer.InvokerUtils;
 import org.apache.servicecomb.demo.CategorizedTestCase;
@@ -27,6 +28,7 @@
 import org.apache.servicecomb.demo.server.User;
 import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart;
 import org.apache.servicecomb.provider.pojo.RpcReference;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 import org.springframework.stereotype.Component;
 
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -51,6 +53,39 @@
   }
 
   @Override
+  public void testHighwayTransport() throws Exception {
+    // test only once
+    testInstanceIsolation();
+  }
+
+  private void testInstanceIsolation() {
+    AtomicInteger e503Business = new AtomicInteger(0);
+    AtomicInteger e503CircuitBreaker = new AtomicInteger(0);
+
+    for (int i = 0; i < 30; i++) {
+      try {
+        InvokerUtils.syncInvoke(SERVICE_NAME, SCHEMA_ID, "instanceIsolationTest", null,
+            String.class);
+      } catch (InvocationException e) {
+        if (e.getStatusCode() == 503) {
+          if ("CommonExceptionData [message=business]".equals(e.getErrorData().toString())) {
+            e503Business.getAndIncrement();
+          } else if ("CommonExceptionData [message=instance isolation circuitBreaker is open.]".equals(
+              e.getErrorData().toString())) {
+            e503CircuitBreaker.getAndIncrement();
+          } else {
+            TestMgr.fail("not expected message");
+          }
+        } else {
+          TestMgr.fail("not expected code");
+        }
+      }
+    }
+    TestMgr.check(true, e503Business.get() >= 10);
+    TestMgr.check(true, e503CircuitBreaker.get() >= 10);
+  }
+
+  @Override
   public void testRestTransport() throws Exception {
     testDeleteAfterFinished();
   }
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
index a0ac2a2..88b099f 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
@@ -25,12 +25,32 @@
   handler:
     chain:
       Consumer:
-        default: bizkeeper-consumer,loadbalance
+        default: loadbalance,instance-isolation-consumer,instance-bulkhead-consumer
   loadbalance:
     strategy:
       name: mycustomrule
     retryEnabled: true
 
+    filter:
+      isolation:
+        enabled: false
+
+  matchGroup:
+    instanceIsolationTest: |
+      matches:
+        - apiPath:
+            prefix: "/codeFirstJaxrs/instanceIsolationTest"
+  instanceIsolation:
+    instanceIsolationTest: |
+      minimumNumberOfCalls: 10
+      slidingWindowSize: 20
+      slidingWindowType: COUNT_BASED
+      failureRateThreshold: 50
+      slowCallRateThreshold: 100
+      slowCallDurationThreshold: 3000
+      waitDurationInOpenState: 200
+      permittedNumberOfCallsInHalfOpenState: 10
+
   request:
     timeout: 30000
     jaxrs:
diff --git a/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/CodeFirstJaxrs.java b/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/CodeFirstJaxrs.java
index 3fedafd..81e350f 100644
--- a/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/CodeFirstJaxrs.java
+++ b/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/CodeFirstJaxrs.java
@@ -58,6 +58,7 @@
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.context.ContextUtils;
 import org.apache.servicecomb.swagger.invocation.context.InvocationContext;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
 
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
@@ -150,7 +151,8 @@
   public String testRawJsonString(String jsonInput) {
     Map<String, String> person;
     try {
-      person = RestObjectMapperFactory.getRestObjectMapper().readValue(jsonInput.getBytes(StandardCharsets.UTF_8), Map.class);
+      person = RestObjectMapperFactory.getRestObjectMapper()
+          .readValue(jsonInput.getBytes(StandardCharsets.UTF_8), Map.class);
     } catch (Exception e) {
       e.printStackTrace();
       return null;
@@ -209,7 +211,8 @@
   public String testRawJsonAnnotation(@RawJsonRequestBody String jsonInput) {
     Map<String, String> person;
     try {
-      person = RestObjectMapperFactory.getRestObjectMapper().readValue(jsonInput.getBytes(StandardCharsets.UTF_8), Map.class);
+      person = RestObjectMapperFactory.getRestObjectMapper()
+          .readValue(jsonInput.getBytes(StandardCharsets.UTF_8), Map.class);
     } catch (Exception e) {
       e.printStackTrace();
       return null;
@@ -275,6 +278,12 @@
         .setSubmittedFileName(name);
   }
 
+  @Path("/instanceIsolationTest")
+  @GET
+  public String instanceIsolationTest() {
+    throw new InvocationException(503, "", "business");
+  }
+
   private File createTempFile(String name, String content) throws IOException {
     File systemTempFile = new File(System.getProperty("java.io.tmpdir"));
     File file = new File(systemTempFile, name);
diff --git a/demo/demo-spring-boot-provider/demo-spring-boot-jaxrs-client/src/main/resources/application.yml b/demo/demo-spring-boot-provider/demo-spring-boot-jaxrs-client/src/main/resources/application.yml
index f0dc3cd..8631c2a 100644
--- a/demo/demo-spring-boot-provider/demo-spring-boot-jaxrs-client/src/main/resources/application.yml
+++ b/demo/demo-spring-boot-provider/demo-spring-boot-jaxrs-client/src/main/resources/application.yml
@@ -32,4 +32,4 @@
   handler:
     chain:
       Consumer:
-        default: bizkeeper-consumer,loadbalance
+        default: loadbalance,instance-isolation-consumer,instance-bulkhead-consumer
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/event/ServiceCenterEventBus.java b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/api/event/ServiceCenterEventBus.java
similarity index 94%
rename from service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/event/ServiceCenterEventBus.java
rename to foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/api/event/ServiceCenterEventBus.java
index 316cc32..204ee72 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/event/ServiceCenterEventBus.java
+++ b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/api/event/ServiceCenterEventBus.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.serviceregistry.event;
+package org.apache.servicecomb.registry.api.event;
 
 import org.apache.servicecomb.foundation.common.event.SimpleEventBus;
 
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
index 73d3dbb..c6809de 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
@@ -27,6 +27,9 @@
 import org.apache.servicecomb.foundation.common.utils.BeanUtils;
 import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
 import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.registry.api.MicroserviceKey;
+import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
 import org.apache.servicecomb.swagger.invocation.AsyncResponse;
 import org.apache.servicecomb.swagger.invocation.Response;
 import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
@@ -65,17 +68,27 @@
       }
 
       if (e instanceof CallNotPermittedException) {
+        LOGGER.warn("instance isolation circuitBreaker is open by policy : {}", e.getMessage());
+        ServiceCenterEventBus.getEventBus().post(createMicroserviceInstanceChangedEvent(invocation));
         // return 503 so that consumer can retry
         asyncResp.complete(
             Response.failResp(new InvocationException(503, "instance isolation circuitBreaker is open.",
                 new CommonExceptionData("instance isolation circuitBreaker is open."))));
-        LOGGER.warn("instance isolation circuitBreaker is open by policy : {}", e.getMessage());
       } else {
         asyncResp.complete(Response.createProducerFail(e));
       }
     });
   }
 
+  private Object createMicroserviceInstanceChangedEvent(Invocation invocation) {
+    MicroserviceInstanceChangedEvent event = new MicroserviceInstanceChangedEvent();
+    MicroserviceKey key = new MicroserviceKey();
+    key.setAppId(invocation.getAppId());
+    key.setServiceName(invocation.getMicroserviceName());
+    event.setKey(key);
+    return event;
+  }
+
   private void addCircuitBreaker(DecorateCompletionStage<Response> dcs, GovernanceRequest request) {
     CircuitBreaker circuitBreaker = instanceIsolationHandler.getActuator(request);
     if (circuitBreaker != null) {
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java
index e1bb76c..dbb31c7 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java
@@ -33,7 +33,7 @@
 import org.apache.servicecomb.service.center.client.model.RbacTokenRequest;
 import org.apache.servicecomb.service.center.client.model.RbacTokenResponse;
 import org.apache.servicecomb.serviceregistry.event.NotPermittedEvent;
-import org.apache.servicecomb.serviceregistry.event.ServiceCenterEventBus;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index 32effa0..9c5ed1d 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -63,7 +63,7 @@
 import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
 import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
 import org.apache.servicecomb.serviceregistry.event.NotPermittedEvent;
-import org.apache.servicecomb.serviceregistry.event.ServiceCenterEventBus;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
 import org.apache.servicecomb.serviceregistry.task.HeartbeatResult;
 import org.apache.servicecomb.serviceregistry.task.MicroserviceInstanceHeartbeatTask;
 import org.slf4j.Logger;
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ClassificationAddress.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ClassificationAddress.java
index b81e070..46da313 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ClassificationAddress.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ClassificationAddress.java
@@ -43,7 +43,7 @@
 import org.apache.servicecomb.registry.cache.InstanceCacheManager;
 import org.apache.servicecomb.registry.definition.DefinitionConst;
 import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
-import org.apache.servicecomb.serviceregistry.event.ServiceCenterEventBus;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
 import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
 import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheRefreshedEvent;
 
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index 6896ed5..cf32dde 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -19,13 +19,15 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
 import org.apache.servicecomb.registry.DiscoveryManager;
 import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
-import org.apache.servicecomb.serviceregistry.event.ShutdownEvent;
 import org.apache.servicecomb.registry.api.registry.BasePath;
 import org.apache.servicecomb.registry.api.registry.Microservice;
 import org.apache.servicecomb.registry.api.registry.MicroserviceFactory;
@@ -36,11 +38,11 @@
 import org.apache.servicecomb.serviceregistry.api.Const;
 import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
 import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
+import org.apache.servicecomb.serviceregistry.event.ShutdownEvent;
 import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
 import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey;
 import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheRefreshedEvent;
 import org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache;
-import org.apache.servicecomb.serviceregistry.registry.cache.ServiceRegistryCache;
 import org.apache.servicecomb.serviceregistry.task.MicroserviceServiceCenterTask;
 import org.apache.servicecomb.serviceregistry.task.ServiceCenterTask;
 import org.slf4j.Logger;
@@ -48,7 +50,6 @@
 
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.MoreExecutors;
 
 public abstract class AbstractServiceRegistry implements ServiceRegistry {
   private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServiceRegistry.class);
@@ -65,12 +66,15 @@
 
   protected ServiceCenterTask serviceCenterTask;
 
-  protected ExecutorService executorService = MoreExecutors.newDirectExecutorService();
+  private final ExecutorService executorService = new ThreadPoolExecutor(1, 1,
+      0L, TimeUnit.MILLISECONDS,
+      new ArrayBlockingQueue<>(5),
+      r -> new Thread(r, "instance-changed-event-task"));
+
+  protected RefreshableServiceRegistryCache serviceRegistryCache;
 
   private String name;
 
-  RefreshableServiceRegistryCache serviceRegistryCache;
-
   public AbstractServiceRegistry(EventBus eventBus, ServiceRegistryConfig serviceRegistryConfig,
       Configuration configuration) {
     setName(serviceRegistryConfig.getRegistryName());
@@ -257,23 +261,26 @@
     this.name = name;
   }
 
-  public ServiceRegistryCache getServiceRegistryCache() {
-    return serviceRegistryCache;
-  }
-
   @Subscribe
   public void onShutdown(ShutdownEvent event) {
+    shutdownEventHandler(event);
+  }
+
+  protected void shutdownEventHandler(ShutdownEvent event) {
     LOGGER.info("service center task is shutdown.");
     executorService.shutdownNow();
   }
 
-  // post from watch eventloop, should refresh the exact microservice instances immediately
   @Subscribe
   public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
-    executorService.execute(new SuppressedRunnableWrapper(
-        () -> {
-          serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
-          DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstanceChanged(changedEvent);
-        }));
+    try {
+      executorService.execute(new SuppressedRunnableWrapper(
+          () -> {
+            serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
+            DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstanceChanged(changedEvent);
+          }));
+    } catch (Exception e) {
+      LOGGER.info("instance changed event ignored, {}", e.getMessage());
+    }
   }
 }
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
index f039736..2af08c3 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
@@ -30,6 +30,7 @@
 import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
 import org.apache.servicecomb.serviceregistry.event.HeartbeatFailEvent;
 import org.apache.servicecomb.serviceregistry.event.HeartbeatSuccEvent;
+import org.apache.servicecomb.serviceregistry.event.ShutdownEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +70,6 @@
         },
         (task, executor) -> LOGGER.warn("Too many pending tasks, reject " + task.toString())
     );
-    executorService = taskPool;
   }
 
   @Override
@@ -107,6 +107,12 @@
     return this.taskPool;
   }
 
+  @Override
+  protected void shutdownEventHandler(ShutdownEvent event) {
+    super.shutdownEventHandler(event);
+    this.taskPool.shutdownNow();
+  }
+
   @Subscribe
   public void onHeartbeatSuccEvent(HeartbeatSuccEvent heartbeatSuccEvent) {
     // 可以考虑多等待一个心跳周期,这样的好处是尽可能避免provider滞后于consumer注册的情况,consumer访问provider失败
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/ServiceRegistryFactory.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/ServiceRegistryFactory.java
index 966f414..9f7aedf 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/ServiceRegistryFactory.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/ServiceRegistryFactory.java
@@ -21,7 +21,7 @@
 import org.apache.servicecomb.foundation.common.event.SimpleEventBus;
 import org.apache.servicecomb.serviceregistry.ServiceRegistry;
 import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
-import org.apache.servicecomb.serviceregistry.event.ServiceCenterEventBus;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.eventbus.EventBus;
diff --git a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
index dbcf6be..977a5e2 100644
--- a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
+++ b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
@@ -34,10 +34,10 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.junit.jupiter.api.Assertions;
 
 public class TestConsumers extends TestRegistryBase {
   @Before
@@ -89,6 +89,10 @@
     key.setAppId(appId);
     key.setServiceName(serviceName);
     eventBus.post(event);
+    long begin = System.currentTimeMillis();
+    while (microserviceManager.getVersionsByName().size() > 0 && System.currentTimeMillis() - begin < 1000) {
+      Thread.yield();
+    }
     Assertions.assertEquals(0, microserviceManager.getVersionsByName().size());
   }