[SCB-2650][SCB-2290]support instance isolation and instance bulkhead (#3242)

diff --git a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
index 1eb345f..c663b70 100644
--- a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
+++ b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
@@ -37,7 +37,7 @@
   handler:
     chain:
       Consumer:
-        default: governance-consumer,loadbalance
+        default: loadbalance
       Provider:
         default: governance-provider
 
diff --git a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
index 7d4118f..972f3ef 100644
--- a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
+++ b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
@@ -33,6 +33,6 @@
   handler:
     chain:
       Consumer:
-        default: governance-consumer,loadbalance
+        default: loadbalance
       Provider:
         default: governance-provider
\ No newline at end of file
diff --git a/foundations/foundation-common/src/main/resources/config/base/log4j.properties b/foundations/foundation-common/src/main/resources/config/base/log4j.properties
index 83de4e4..0cdb3bf 100644
--- a/foundations/foundation-common/src/main/resources/config/base/log4j.properties
+++ b/foundations/foundation-common/src/main/resources/config/base/log4j.properties
@@ -25,7 +25,7 @@
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%p] %m %l%n
+log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss,SSS/zzz}][%t][%p]%m %l%n
 
 log4j.appender.paas=org.apache.servicecomb.foundation.common.utils.RollingFileAppenderExt
 log4j.appender.paas.file=${paas.logs.dir}${paas.logs.file}
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
index 03eb521..9307da4 100644
--- a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
@@ -22,6 +22,7 @@
 import org.apache.servicecomb.governance.handler.BulkheadHandler;
 import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
 import org.apache.servicecomb.governance.handler.FaultInjectionHandler;
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
 import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
 import org.apache.servicecomb.governance.handler.RateLimitingHandler;
 import org.apache.servicecomb.governance.handler.RetryHandler;
@@ -38,6 +39,7 @@
 import org.apache.servicecomb.governance.properties.BulkheadProperties;
 import org.apache.servicecomb.governance.properties.CircuitBreakerProperties;
 import org.apache.servicecomb.governance.properties.FaultInjectionProperties;
+import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties;
 import org.apache.servicecomb.governance.properties.InstanceIsolationProperties;
 import org.apache.servicecomb.governance.properties.MatchProperties;
 import org.apache.servicecomb.governance.properties.RateLimitProperties;
@@ -59,6 +61,11 @@
   }
 
   @Bean
+  public InstanceBulkheadProperties instanceBulkheadProperties() {
+    return new InstanceBulkheadProperties();
+  }
+
+  @Bean
   public CircuitBreakerProperties circuitBreakerProperties() {
     return new CircuitBreakerProperties();
   }
@@ -95,6 +102,11 @@
   }
 
   @Bean
+  public InstanceBulkheadHandler instanceBulkheadHandler(InstanceBulkheadProperties instanceBulkheadProperties) {
+    return new InstanceBulkheadHandler(instanceBulkheadProperties);
+  }
+
+  @Bean
   public CircuitBreakerHandler circuitBreakerHandler(CircuitBreakerProperties circuitBreakerProperties,
       AbstractCircuitBreakerExtension circuitBreakerExtension,
       ObjectProvider<MeterRegistry> meterRegistry) {
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
new file mode 100644
index 0000000..bdfe7d0
--- /dev/null
+++ b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance.handler;
+
+import java.time.Duration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.governance.policy.BulkheadPolicy;
+import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.bulkhead.BulkheadRegistry;
+
+public class InstanceBulkheadHandler extends AbstractGovernanceHandler<Bulkhead, BulkheadPolicy> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(InstanceBulkheadHandler.class);
+
+  private final InstanceBulkheadProperties bulkheadProperties;
+
+  public InstanceBulkheadHandler(InstanceBulkheadProperties bulkheadProperties) {
+    this.bulkheadProperties = bulkheadProperties;
+  }
+
+  @Override
+  protected String createKey(GovernanceRequest governanceRequest, BulkheadPolicy policy) {
+    return InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY
+        + "." + governanceRequest.getServiceName()
+        + "." + policy.getName()
+        + "." + governanceRequest.getInstanceId();
+  }
+
+  @Override
+  protected void onConfigurationChanged(String key) {
+    if (key.startsWith(InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY)) {
+      for (String processorKey : processors.keySet()) {
+        if (processorKey.startsWith(key)) {
+          processors.remove(processorKey);
+        }
+      }
+    }
+  }
+
+  @Override
+  public BulkheadPolicy matchPolicy(GovernanceRequest governanceRequest) {
+    if (StringUtils.isEmpty(governanceRequest.getServiceName()) || StringUtils.isEmpty(
+        governanceRequest.getInstanceId())) {
+      LOGGER.info("Instance bulkhead is not properly configured, service id or instance id is empty.");
+      return null;
+    }
+    return matchersManager.match(governanceRequest, bulkheadProperties.getParsedEntity());
+  }
+
+  @Override
+  public Bulkhead createProcessor(String key, GovernanceRequest governanceRequest, BulkheadPolicy policy) {
+    return getBulkhead(key, policy);
+  }
+
+  private Bulkhead getBulkhead(String key, BulkheadPolicy policy) {
+    LOGGER.info("applying new policy {} for {}", key, policy.toString());
+
+    BulkheadConfig config = BulkheadConfig.custom()
+        .maxConcurrentCalls(policy.getMaxConcurrentCalls())
+        .maxWaitDuration(Duration.parse(policy.getMaxWaitDuration()))
+        .build();
+
+    BulkheadRegistry registry = BulkheadRegistry.of(config);
+
+    return registry.bulkhead(key, config);
+  }
+}
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
index 8af8ae6..e7fea24 100644
--- a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
@@ -38,12 +38,15 @@
   }
 
   private static boolean statusCodeMatch(String status, String responseStatus) {
-    if (3 != status.length()) {
+    if (status == null) {
+      return false;
+    }
+    if (responseStatus.length() != status.length()) {
       return false;
     }
     char[] statusChar = status.toCharArray();
     char[] responseChar = responseStatus.toCharArray();
-    return IntStream.range(0, 3).noneMatch(i ->
+    return IntStream.range(0, statusChar.length).noneMatch(i ->
         statusChar[i] != responseChar[i] && statusChar[i] != 'x');
   }
 }
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
index 9996d1b..1d1bdfe 100644
--- a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
@@ -17,8 +17,9 @@
 package org.apache.servicecomb.governance.policy;
 
 import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.servicecomb.governance.utils.GovernanceUtils;
@@ -48,6 +49,9 @@
 
   public static final String DEFAULT_FAILURE_RESPONSE_STATUS_503 = "503";
 
+  public static final List<String> DEFAULT_STATUS_LIST = Arrays.asList(DEFAULT_FAILURE_RESPONSE_STATUS_502,
+      DEFAULT_FAILURE_RESPONSE_STATUS_503);
+
   private float failureRateThreshold = DEFAULT_FAILURE_RATE_THRESHOLD;
 
   private float slowCallRateThreshold = DEFAULT_SLOW_CALL_RATE_THRESHOLD;
@@ -65,7 +69,7 @@
   private String slidingWindowSize = DEFAULT_SLIDING_WINDOW_SIZE;
 
   //status code that need record as a failure
-  private List<String> recordFailureStatus = new ArrayList<>();
+  private List<String> recordFailureStatus = DEFAULT_STATUS_LIST;
 
   //force close this circuit breaker. This parameter is not used by circuit breaker directly
   private boolean forceClosed = false;
@@ -193,14 +197,17 @@
 
   public List<String> getRecordFailureStatus() {
     if (CollectionUtils.isEmpty(this.recordFailureStatus)) {
-      this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_502);
-      this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_503);
+      return DEFAULT_STATUS_LIST;
     }
     return this.recordFailureStatus;
   }
 
   public void setRecordFailureStatus(List<String> recordFailureStatus) {
-    this.recordFailureStatus = recordFailureStatus;
+    if (recordFailureStatus == null) {
+      return;
+    }
+    this.recordFailureStatus = recordFailureStatus.stream().filter(e -> !StringUtils.isEmpty(e))
+        .collect(Collectors.toList());
   }
 
   public boolean isForceClosed() {
@@ -230,6 +237,7 @@
         ", minimumNumberOfCalls=" + minimumNumberOfCalls +
         ", slidingWindowType='" + slidingWindowType + '\'' +
         ", slidingWindowSize=" + slidingWindowSize +
+        ", recordFailureStatus=" + recordFailureStatus +
         '}';
   }
 }
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
index 95be919..726121a 100644
--- a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
@@ -17,8 +17,9 @@
 package org.apache.servicecomb.governance.policy;
 
 import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.util.CollectionUtils;
@@ -27,12 +28,15 @@
 
   public static final int DEFAULT_MAX_ATTEMPTS = 3;
 
-  public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(10);
+  public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(1);
 
   public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_502 = "502";
 
   public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_503 = "503";
 
+  public static final List<String> DEFAULT_STATUS_LIST = Arrays.asList(DEFAULT_RETRY_ON_RESPONSE_STATUS_502,
+      DEFAULT_RETRY_ON_RESPONSE_STATUS_503);
+
   private static final Duration INITIAL_INTERVAL = Duration.ofMillis(1000);
 
   private static final float MULTIPLIER = 2;
@@ -48,7 +52,7 @@
   private String waitDuration = DEFAULT_WAIT_DURATION.toString();
 
   //status code that need retry
-  private List<String> retryOnResponseStatus = new ArrayList<>();
+  private List<String> retryOnResponseStatus = DEFAULT_STATUS_LIST;
 
   //retry strategy
   private String retryStrategy = DEFAULT_RETRY_STRATEGY;
@@ -71,14 +75,17 @@
 
   public List<String> getRetryOnResponseStatus() {
     if (CollectionUtils.isEmpty(retryOnResponseStatus)) {
-      this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_502);
-      this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_503);
+      return DEFAULT_STATUS_LIST;
     }
     return retryOnResponseStatus;
   }
 
   public void setRetryOnResponseStatus(List<String> retryOnResponseStatus) {
-    this.retryOnResponseStatus = retryOnResponseStatus;
+    if (retryOnResponseStatus == null) {
+      return;
+    }
+    this.retryOnResponseStatus = retryOnResponseStatus.stream().filter(e -> !StringUtils.isEmpty(e))
+        .collect(Collectors.toList());
   }
 
   public int getMaxAttempts() {
@@ -90,7 +97,7 @@
   }
 
   public String getWaitDuration() {
-    return Duration.parse(waitDuration).toMillis() < 10 ? DEFAULT_WAIT_DURATION.toString() : waitDuration;
+    return Duration.parse(waitDuration).toMillis() < 1 ? DEFAULT_WAIT_DURATION.toString() : waitDuration;
   }
 
   public void setWaitDuration(String waitDuration) {
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
similarity index 63%
rename from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
rename to governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
index f5cfbf7..edd4b60 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
@@ -15,16 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.governance.properties;
 
-import org.apache.servicecomb.core.Handler;
-import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.swagger.invocation.AsyncResponse;
+import org.apache.servicecomb.governance.policy.BulkheadPolicy;
 
-public class ConsumerGovernanceHandler implements Handler {
-  // an empty implementation, will add possible features in future.
+public class InstanceBulkheadProperties extends PolicyProperties<BulkheadPolicy> {
+  public static final String MATCH_INSTANCE_BULKHEAD_KEY = "servicecomb.instanceBulkhead";
+
+  public InstanceBulkheadProperties() {
+    super(MATCH_INSTANCE_BULKHEAD_KEY);
+  }
+
   @Override
-  public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
-    invocation.next(asyncResp);
+  public Class<BulkheadPolicy> getEntityClass() {
+    return BulkheadPolicy.class;
   }
 }
diff --git a/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
new file mode 100644
index 0000000..9c84287
--- /dev/null
+++ b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.servicecomb.governance.handler.ext.AbstractFailurePredictor;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AbstractFailurePredictorTest {
+  class MyAbstractFailurePredictor extends AbstractFailurePredictor {
+    MyAbstractFailurePredictor() {
+    }
+
+    @Override
+    protected String extractStatusCode(Object result) {
+      return (String) result;
+    }
+
+    @Override
+    public boolean isFailedResult(Throwable e) {
+      return super.isFailedResult(e);
+    }
+  }
+
+  @Test
+  public void testCodeMatch() {
+    AbstractFailurePredictor predictor = new MyAbstractFailurePredictor();
+    List<String> statusList = Arrays.asList("500");
+    Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+
+    statusList = Arrays.asList("5x0");
+    Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+
+    statusList = Arrays.asList(null, "xx", "5x0");
+    Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+    Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+  }
+}
diff --git a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
index bc943a0..e0c5596 100644
--- a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
+++ b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
@@ -164,7 +164,7 @@
 
   @Test
   public void test_all_bean_is_loaded() {
-    Assertions.assertEquals(6, propertiesList.size());
+    Assertions.assertEquals(7, propertiesList.size());
   }
 
   @Test
diff --git a/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
new file mode 100644
index 0000000..2377aab
--- /dev/null
+++ b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ContextConfiguration;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCheckedSupplier;
+
+@SpringBootTest
+@ContextConfiguration(classes = {GovernanceConfiguration.class, MockConfiguration.class})
+public class InstanceBulkheadHandlerTest {
+  private InstanceBulkheadHandler instanceBulkheadHandler;
+
+  @Autowired
+  public void setInstanceBulkheadHandler(InstanceBulkheadHandler instanceBulkheadHandler) {
+    this.instanceBulkheadHandler = instanceBulkheadHandler;
+  }
+
+  @Test
+  public void test_instance_bulkhead_work() throws Throwable {
+
+    // instance1
+    DecorateCheckedSupplier<String> dsInstance1 = Decorators.ofCheckedSupplier(() -> "wake");
+
+    GovernanceRequest requestInstance1 = new GovernanceRequest();
+    requestInstance1.setInstanceId("instance01");
+    requestInstance1.setServiceName("service01");
+    requestInstance1.setUri("/test");
+
+    Bulkhead bulkheadInstance1 = instanceBulkheadHandler.getActuator(requestInstance1);
+    dsInstance1.withBulkhead(bulkheadInstance1);
+
+    // instance2
+    DecorateCheckedSupplier<String> dsInstance2 = Decorators.ofCheckedSupplier(() -> {
+      Thread.sleep(1000);
+      return "sleep";
+    });
+
+    GovernanceRequest requestInstance2 = new GovernanceRequest();
+    requestInstance2.setInstanceId("instance02");
+    requestInstance2.setServiceName("service01");
+    requestInstance2.setUri("/test");
+
+    Bulkhead bulkheadInstance2 = instanceBulkheadHandler.getActuator(requestInstance2);
+    dsInstance2.withBulkhead(bulkheadInstance2);
+
+    Executor executor = Executors.newFixedThreadPool(4);
+    AtomicInteger wakeCount = new AtomicInteger(0);
+    AtomicInteger sleepCount = new AtomicInteger(0);
+    AtomicInteger errorCount = new AtomicInteger(0);
+    AtomicInteger rejectCount = new AtomicInteger(0);
+    CountDownLatch countDownLatch = new CountDownLatch(100);
+    for (int i = 0; i < 100; i++) {
+      final int num = i;
+      executor.execute(() -> {
+        // 50% for each server
+        if (num % 2 == 0) {
+          runCommand(dsInstance1, wakeCount, sleepCount, errorCount, rejectCount, countDownLatch);
+        } else {
+          runCommand(dsInstance2, wakeCount, sleepCount, errorCount, rejectCount, countDownLatch);
+        }
+      });
+    }
+    countDownLatch.await(100, TimeUnit.SECONDS);
+    Assertions.assertEquals(50, wakeCount.get());
+    Assertions.assertEquals(2, sleepCount.get());
+    Assertions.assertEquals(0, errorCount.get());
+    Assertions.assertEquals(48, rejectCount.get());
+  }
+
+  private void runCommand(DecorateCheckedSupplier<String> ds, AtomicInteger wakeCount, AtomicInteger sleepCount,
+      AtomicInteger errorCount, AtomicInteger rejectCount, CountDownLatch countDownLatch) {
+    try {
+      String result = ds.get();
+      if ("wake".equals(result)) {
+        wakeCount.incrementAndGet();
+      } else if ("sleep".equals(result)) {
+        sleepCount.incrementAndGet();
+      } else {
+        errorCount.incrementAndGet();
+      }
+    } catch (BulkheadFullException e) {
+      rejectCount.incrementAndGet();
+    } catch (Throwable e) {
+      errorCount.incrementAndGet();
+    }
+    countDownLatch.countDown();
+  }
+}
diff --git a/governance/src/test/resources/application.yaml b/governance/src/test/resources/application.yaml
index e5c0842..5a98ad6 100644
--- a/governance/src/test/resources/application.yaml
+++ b/governance/src/test/resources/application.yaml
@@ -122,6 +122,10 @@
       slidingWindowSize: 2
       slidingWindowType: COUNT_BASED
       waitDurationInOpenState: 1000
+  instanceBulkhead:
+    demo-allOperation: |
+      maxConcurrentCalls: 2
+      maxWaitDuration: 10
   faultInjection:
     demo-fallback-ThrowException: |
       type: abort
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
new file mode 100644
index 0000000..65e4346
--- /dev/null
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.handler.governance;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import org.apache.servicecomb.core.Handler;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.governance.MatchType;
+import org.apache.servicecomb.foundation.common.utils.BeanUtils;
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.swagger.invocation.AsyncResponse;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
+
+public class ConsumerInstanceBulkheadHandler implements Handler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInstanceBulkheadHandler.class);
+
+  private final InstanceBulkheadHandler instanceBulkheadHandler = BeanUtils.getBean(InstanceBulkheadHandler.class);
+
+  @Override
+  public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
+    if (invocation.getEndpoint() == null) {
+      invocation.next(asyncResp);
+      return;
+    }
+    Supplier<CompletionStage<Response>> next = createBusinessCompletionStageSupplier(invocation);
+    DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
+    GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+    request.setServiceName(invocation.getMicroserviceName());
+    request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId());
+
+    addBulkhead(dcs, request);
+
+    dcs.get().whenComplete((r, e) -> {
+      if (e == null) {
+        asyncResp.complete(r);
+        return;
+      }
+
+      if (e instanceof BulkheadFullException) {
+        // return 503 so that consumer can retry
+        asyncResp.complete(
+            Response.failResp(new InvocationException(503, "bulkhead is full and does not permit further calls.",
+                new CommonExceptionData("bulkhead is full and does not permit further calls."))));
+        LOGGER.warn("bulkhead is full and does not permit further calls by policy : {}", e.getMessage());
+      } else {
+        asyncResp.complete(Response.createProducerFail(e));
+      }
+    });
+  }
+
+  private void addBulkhead(DecorateCompletionStage<Response> dcs, GovernanceRequest request) {
+    Bulkhead bulkhead = instanceBulkheadHandler.getActuator(request);
+    if (bulkhead != null) {
+      dcs.withBulkhead(bulkhead);
+    }
+  }
+
+  private Supplier<CompletionStage<Response>> createBusinessCompletionStageSupplier(Invocation invocation) {
+    return () -> {
+      CompletableFuture<Response> result = new CompletableFuture<>();
+      try {
+        invocation.next(result::complete);
+      } catch (Exception e) {
+        result.completeExceptionally(e);
+      }
+      return result;
+    };
+  }
+}
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
new file mode 100644
index 0000000..73d3dbb
--- /dev/null
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.handler.governance;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import org.apache.servicecomb.core.Handler;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.governance.MatchType;
+import org.apache.servicecomb.foundation.common.utils.BeanUtils;
+import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.swagger.invocation.AsyncResponse;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
+import io.github.resilience4j.circuitbreaker.CircuitBreaker;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
+
+public class ConsumerInstanceIsolationHandler implements Handler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInstanceIsolationHandler.class);
+
+  private final InstanceIsolationHandler instanceIsolationHandler = BeanUtils.getBean(InstanceIsolationHandler.class);
+
+  @Override
+  public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
+    if (invocation.getEndpoint() == null) {
+      invocation.next(asyncResp);
+      return;
+    }
+    Supplier<CompletionStage<Response>> next = createBusinessCompletionStageSupplier(invocation);
+    DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
+    GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+    request.setServiceName(invocation.getMicroserviceName());
+    request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId());
+
+    addCircuitBreaker(dcs, request);
+
+    dcs.get().whenComplete((r, e) -> {
+      if (e == null) {
+        asyncResp.complete(r);
+        return;
+      }
+
+      if (e instanceof CallNotPermittedException) {
+        // return 503 so that consumer can retry
+        asyncResp.complete(
+            Response.failResp(new InvocationException(503, "instance isolation circuitBreaker is open.",
+                new CommonExceptionData("instance isolation circuitBreaker is open."))));
+        LOGGER.warn("instance isolation circuitBreaker is open by policy : {}", e.getMessage());
+      } else {
+        asyncResp.complete(Response.createProducerFail(e));
+      }
+    });
+  }
+
+  private void addCircuitBreaker(DecorateCompletionStage<Response> dcs, GovernanceRequest request) {
+    CircuitBreaker circuitBreaker = instanceIsolationHandler.getActuator(request);
+    if (circuitBreaker != null) {
+      dcs.withCircuitBreaker(circuitBreaker);
+    }
+  }
+
+  private Supplier<CompletionStage<Response>> createBusinessCompletionStageSupplier(Invocation invocation) {
+    return () -> {
+      CompletableFuture<Response> result = new CompletableFuture<>();
+      try {
+        invocation.next(result::complete);
+      } catch (Exception e) {
+        result.completeExceptionally(e);
+      }
+      return result;
+    };
+  }
+}
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
index 8ae3df8..f3142ab 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
@@ -116,16 +116,7 @@
     return () -> {
       CompletableFuture<Response> result = new CompletableFuture<>();
       try {
-        invocation.next(response -> {
-          if (response.isFailed()) {
-            // For failed response, create a fail to make circuit breaker work.
-            // Users application maybe much complicated than this simple logic,
-            // while they need to customize which error will cause circuit breaker.
-            result.completeExceptionally(response.getResult());
-          } else {
-            result.complete(response);
-          }
-        });
+        invocation.next(result::complete);
       } catch (Exception e) {
         result.completeExceptionally(e);
       }
diff --git a/handlers/handler-governance/src/main/resources/config/cse.handler.xml b/handlers/handler-governance/src/main/resources/config/cse.handler.xml
index c7383f0..ab5a82c 100644
--- a/handlers/handler-governance/src/main/resources/config/cse.handler.xml
+++ b/handlers/handler-governance/src/main/resources/config/cse.handler.xml
@@ -18,6 +18,8 @@
 <config>
   <handler id="governance-provider"
     class="org.apache.servicecomb.handler.governance.ProviderGovernanceHandler"/>
-  <handler id="governance-consumer"
-    class="org.apache.servicecomb.handler.governance.ConsumerGovernanceHandler"/>
+  <handler id="instance-bulkhead-consumer"
+    class="org.apache.servicecomb.handler.governance.ConsumerInstanceBulkheadHandler"/>
+  <handler id="instance-isolation-consumer"
+    class="org.apache.servicecomb.handler.governance.ConsumerInstanceIsolationHandler"/>
 </config>