[SCB-2650][SCB-2290]support instance isolation and instance bulkhead (#3242)
diff --git a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
index 1eb345f..c663b70 100644
--- a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
+++ b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-client/src/main/resources/application.yml
@@ -37,7 +37,7 @@
handler:
chain:
Consumer:
- default: governance-consumer,loadbalance
+ default: loadbalance
Provider:
default: governance-provider
diff --git a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
index 7d4118f..972f3ef 100644
--- a/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
+++ b/demo/demo-zeroconfig-schemadiscovery-registry/demo-zeroconfig-schemadiscovery-registry-server/src/main/resources/application.yml
@@ -33,6 +33,6 @@
handler:
chain:
Consumer:
- default: governance-consumer,loadbalance
+ default: loadbalance
Provider:
default: governance-provider
\ No newline at end of file
diff --git a/foundations/foundation-common/src/main/resources/config/base/log4j.properties b/foundations/foundation-common/src/main/resources/config/base/log4j.properties
index 83de4e4..0cdb3bf 100644
--- a/foundations/foundation-common/src/main/resources/config/base/log4j.properties
+++ b/foundations/foundation-common/src/main/resources/config/base/log4j.properties
@@ -25,7 +25,7 @@
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d [%p] %m %l%n
+log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss,SSS/zzz}][%t][%p]%m %l%n
log4j.appender.paas=org.apache.servicecomb.foundation.common.utils.RollingFileAppenderExt
log4j.appender.paas.file=${paas.logs.dir}${paas.logs.file}
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
index 03eb521..9307da4 100644
--- a/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/GovernanceConfiguration.java
@@ -22,6 +22,7 @@
import org.apache.servicecomb.governance.handler.BulkheadHandler;
import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
import org.apache.servicecomb.governance.handler.FaultInjectionHandler;
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
import org.apache.servicecomb.governance.handler.RateLimitingHandler;
import org.apache.servicecomb.governance.handler.RetryHandler;
@@ -38,6 +39,7 @@
import org.apache.servicecomb.governance.properties.BulkheadProperties;
import org.apache.servicecomb.governance.properties.CircuitBreakerProperties;
import org.apache.servicecomb.governance.properties.FaultInjectionProperties;
+import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties;
import org.apache.servicecomb.governance.properties.InstanceIsolationProperties;
import org.apache.servicecomb.governance.properties.MatchProperties;
import org.apache.servicecomb.governance.properties.RateLimitProperties;
@@ -59,6 +61,11 @@
}
@Bean
+ public InstanceBulkheadProperties instanceBulkheadProperties() {
+ return new InstanceBulkheadProperties();
+ }
+
+ @Bean
public CircuitBreakerProperties circuitBreakerProperties() {
return new CircuitBreakerProperties();
}
@@ -95,6 +102,11 @@
}
@Bean
+ public InstanceBulkheadHandler instanceBulkheadHandler(InstanceBulkheadProperties instanceBulkheadProperties) {
+ return new InstanceBulkheadHandler(instanceBulkheadProperties);
+ }
+
+ @Bean
public CircuitBreakerHandler circuitBreakerHandler(CircuitBreakerProperties circuitBreakerProperties,
AbstractCircuitBreakerExtension circuitBreakerExtension,
ObjectProvider<MeterRegistry> meterRegistry) {
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
new file mode 100644
index 0000000..bdfe7d0
--- /dev/null
+++ b/governance/src/main/java/org/apache/servicecomb/governance/handler/InstanceBulkheadHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance.handler;
+
+import java.time.Duration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.governance.policy.BulkheadPolicy;
+import org.apache.servicecomb.governance.properties.InstanceBulkheadProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadConfig;
+import io.github.resilience4j.bulkhead.BulkheadRegistry;
+
+public class InstanceBulkheadHandler extends AbstractGovernanceHandler<Bulkhead, BulkheadPolicy> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstanceBulkheadHandler.class);
+
+ private final InstanceBulkheadProperties bulkheadProperties;
+
+ public InstanceBulkheadHandler(InstanceBulkheadProperties bulkheadProperties) {
+ this.bulkheadProperties = bulkheadProperties;
+ }
+
+ @Override
+ protected String createKey(GovernanceRequest governanceRequest, BulkheadPolicy policy) {
+ return InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY
+ + "." + governanceRequest.getServiceName()
+ + "." + policy.getName()
+ + "." + governanceRequest.getInstanceId();
+ }
+
+ @Override
+ protected void onConfigurationChanged(String key) {
+ if (key.startsWith(InstanceBulkheadProperties.MATCH_INSTANCE_BULKHEAD_KEY)) {
+ for (String processorKey : processors.keySet()) {
+ if (processorKey.startsWith(key)) {
+ processors.remove(processorKey);
+ }
+ }
+ }
+ }
+
+ @Override
+ public BulkheadPolicy matchPolicy(GovernanceRequest governanceRequest) {
+ if (StringUtils.isEmpty(governanceRequest.getServiceName()) || StringUtils.isEmpty(
+ governanceRequest.getInstanceId())) {
+ LOGGER.info("Instance bulkhead is not properly configured, service id or instance id is empty.");
+ return null;
+ }
+ return matchersManager.match(governanceRequest, bulkheadProperties.getParsedEntity());
+ }
+
+ @Override
+ public Bulkhead createProcessor(String key, GovernanceRequest governanceRequest, BulkheadPolicy policy) {
+ return getBulkhead(key, policy);
+ }
+
+ private Bulkhead getBulkhead(String key, BulkheadPolicy policy) {
+ LOGGER.info("applying new policy {} for {}", key, policy.toString());
+
+ BulkheadConfig config = BulkheadConfig.custom()
+ .maxConcurrentCalls(policy.getMaxConcurrentCalls())
+ .maxWaitDuration(Duration.parse(policy.getMaxWaitDuration()))
+ .build();
+
+ BulkheadRegistry registry = BulkheadRegistry.of(config);
+
+ return registry.bulkhead(key, config);
+ }
+}
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
index 8af8ae6..e7fea24 100644
--- a/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/handler/ext/AbstractFailurePredictor.java
@@ -38,12 +38,15 @@
}
private static boolean statusCodeMatch(String status, String responseStatus) {
- if (3 != status.length()) {
+ if (status == null) {
+ return false;
+ }
+ if (responseStatus.length() != status.length()) {
return false;
}
char[] statusChar = status.toCharArray();
char[] responseChar = responseStatus.toCharArray();
- return IntStream.range(0, 3).noneMatch(i ->
+ return IntStream.range(0, statusChar.length).noneMatch(i ->
statusChar[i] != responseChar[i] && statusChar[i] != 'x');
}
}
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
index 9996d1b..1d1bdfe 100644
--- a/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/policy/CircuitBreakerPolicy.java
@@ -17,8 +17,9 @@
package org.apache.servicecomb.governance.policy;
import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.governance.utils.GovernanceUtils;
@@ -48,6 +49,9 @@
public static final String DEFAULT_FAILURE_RESPONSE_STATUS_503 = "503";
+ public static final List<String> DEFAULT_STATUS_LIST = Arrays.asList(DEFAULT_FAILURE_RESPONSE_STATUS_502,
+ DEFAULT_FAILURE_RESPONSE_STATUS_503);
+
private float failureRateThreshold = DEFAULT_FAILURE_RATE_THRESHOLD;
private float slowCallRateThreshold = DEFAULT_SLOW_CALL_RATE_THRESHOLD;
@@ -65,7 +69,7 @@
private String slidingWindowSize = DEFAULT_SLIDING_WINDOW_SIZE;
//status code that need record as a failure
- private List<String> recordFailureStatus = new ArrayList<>();
+ private List<String> recordFailureStatus = DEFAULT_STATUS_LIST;
//force close this circuit breaker. This parameter is not used by circuit breaker directly
private boolean forceClosed = false;
@@ -193,14 +197,17 @@
public List<String> getRecordFailureStatus() {
if (CollectionUtils.isEmpty(this.recordFailureStatus)) {
- this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_502);
- this.recordFailureStatus.add(DEFAULT_FAILURE_RESPONSE_STATUS_503);
+ return DEFAULT_STATUS_LIST;
}
return this.recordFailureStatus;
}
public void setRecordFailureStatus(List<String> recordFailureStatus) {
- this.recordFailureStatus = recordFailureStatus;
+ if (recordFailureStatus == null) {
+ return;
+ }
+ this.recordFailureStatus = recordFailureStatus.stream().filter(e -> !StringUtils.isEmpty(e))
+ .collect(Collectors.toList());
}
public boolean isForceClosed() {
@@ -230,6 +237,7 @@
", minimumNumberOfCalls=" + minimumNumberOfCalls +
", slidingWindowType='" + slidingWindowType + '\'' +
", slidingWindowSize=" + slidingWindowSize +
+ ", recordFailureStatus=" + recordFailureStatus +
'}';
}
}
diff --git a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
index 95be919..726121a 100644
--- a/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/policy/RetryPolicy.java
@@ -17,8 +17,9 @@
package org.apache.servicecomb.governance.policy;
import java.time.Duration;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils;
@@ -27,12 +28,15 @@
public static final int DEFAULT_MAX_ATTEMPTS = 3;
- public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(10);
+ public static final Duration DEFAULT_WAIT_DURATION = Duration.ofMillis(1);
public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_502 = "502";
public static final String DEFAULT_RETRY_ON_RESPONSE_STATUS_503 = "503";
+ public static final List<String> DEFAULT_STATUS_LIST = Arrays.asList(DEFAULT_RETRY_ON_RESPONSE_STATUS_502,
+ DEFAULT_RETRY_ON_RESPONSE_STATUS_503);
+
private static final Duration INITIAL_INTERVAL = Duration.ofMillis(1000);
private static final float MULTIPLIER = 2;
@@ -48,7 +52,7 @@
private String waitDuration = DEFAULT_WAIT_DURATION.toString();
//status code that need retry
- private List<String> retryOnResponseStatus = new ArrayList<>();
+ private List<String> retryOnResponseStatus = DEFAULT_STATUS_LIST;
//retry strategy
private String retryStrategy = DEFAULT_RETRY_STRATEGY;
@@ -71,14 +75,17 @@
public List<String> getRetryOnResponseStatus() {
if (CollectionUtils.isEmpty(retryOnResponseStatus)) {
- this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_502);
- this.retryOnResponseStatus.add(DEFAULT_RETRY_ON_RESPONSE_STATUS_503);
+ return DEFAULT_STATUS_LIST;
}
return retryOnResponseStatus;
}
public void setRetryOnResponseStatus(List<String> retryOnResponseStatus) {
- this.retryOnResponseStatus = retryOnResponseStatus;
+ if (retryOnResponseStatus == null) {
+ return;
+ }
+ this.retryOnResponseStatus = retryOnResponseStatus.stream().filter(e -> !StringUtils.isEmpty(e))
+ .collect(Collectors.toList());
}
public int getMaxAttempts() {
@@ -90,7 +97,7 @@
}
public String getWaitDuration() {
- return Duration.parse(waitDuration).toMillis() < 10 ? DEFAULT_WAIT_DURATION.toString() : waitDuration;
+ return Duration.parse(waitDuration).toMillis() < 1 ? DEFAULT_WAIT_DURATION.toString() : waitDuration;
}
public void setWaitDuration(String waitDuration) {
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
similarity index 63%
rename from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
rename to governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
index f5cfbf7..edd4b60 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
+++ b/governance/src/main/java/org/apache/servicecomb/governance/properties/InstanceBulkheadProperties.java
@@ -15,16 +15,19 @@
* limitations under the License.
*/
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.governance.properties;
-import org.apache.servicecomb.core.Handler;
-import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.swagger.invocation.AsyncResponse;
+import org.apache.servicecomb.governance.policy.BulkheadPolicy;
-public class ConsumerGovernanceHandler implements Handler {
- // an empty implementation, will add possible features in future.
+public class InstanceBulkheadProperties extends PolicyProperties<BulkheadPolicy> {
+ public static final String MATCH_INSTANCE_BULKHEAD_KEY = "servicecomb.instanceBulkhead";
+
+ public InstanceBulkheadProperties() {
+ super(MATCH_INSTANCE_BULKHEAD_KEY);
+ }
+
@Override
- public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
- invocation.next(asyncResp);
+ public Class<BulkheadPolicy> getEntityClass() {
+ return BulkheadPolicy.class;
}
}
diff --git a/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
new file mode 100644
index 0000000..9c84287
--- /dev/null
+++ b/governance/src/test/java/org/apache/servicecomb/governance/AbstractFailurePredictorTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.servicecomb.governance.handler.ext.AbstractFailurePredictor;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AbstractFailurePredictorTest {
+ class MyAbstractFailurePredictor extends AbstractFailurePredictor {
+ MyAbstractFailurePredictor() {
+ }
+
+ @Override
+ protected String extractStatusCode(Object result) {
+ return (String) result;
+ }
+
+ @Override
+ public boolean isFailedResult(Throwable e) {
+ return super.isFailedResult(e);
+ }
+ }
+
+ @Test
+ public void testCodeMatch() {
+ AbstractFailurePredictor predictor = new MyAbstractFailurePredictor();
+ List<String> statusList = Arrays.asList("500");
+ Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+
+ statusList = Arrays.asList("5x0");
+ Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+
+ statusList = Arrays.asList(null, "xx", "5x0");
+ Assertions.assertTrue(predictor.isFailedResult(statusList, "500"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "502"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "400"));
+ Assertions.assertFalse(predictor.isFailedResult(statusList, "444"));
+ }
+}
diff --git a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
index bc943a0..e0c5596 100644
--- a/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
+++ b/governance/src/test/java/org/apache/servicecomb/governance/GovernancePropertiesTest.java
@@ -164,7 +164,7 @@
@Test
public void test_all_bean_is_loaded() {
- Assertions.assertEquals(6, propertiesList.size());
+ Assertions.assertEquals(7, propertiesList.size());
}
@Test
diff --git a/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
new file mode 100644
index 0000000..2377aab
--- /dev/null
+++ b/governance/src/test/java/org/apache/servicecomb/governance/InstanceBulkheadHandlerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.governance;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ContextConfiguration;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCheckedSupplier;
+
+@SpringBootTest
+@ContextConfiguration(classes = {GovernanceConfiguration.class, MockConfiguration.class})
+public class InstanceBulkheadHandlerTest {
+ private InstanceBulkheadHandler instanceBulkheadHandler;
+
+ @Autowired
+ public void setInstanceBulkheadHandler(InstanceBulkheadHandler instanceBulkheadHandler) {
+ this.instanceBulkheadHandler = instanceBulkheadHandler;
+ }
+
+ @Test
+ public void test_instance_bulkhead_work() throws Throwable {
+
+ // instance1
+ DecorateCheckedSupplier<String> dsInstance1 = Decorators.ofCheckedSupplier(() -> "wake");
+
+ GovernanceRequest requestInstance1 = new GovernanceRequest();
+ requestInstance1.setInstanceId("instance01");
+ requestInstance1.setServiceName("service01");
+ requestInstance1.setUri("/test");
+
+ Bulkhead bulkheadInstance1 = instanceBulkheadHandler.getActuator(requestInstance1);
+ dsInstance1.withBulkhead(bulkheadInstance1);
+
+ // instance2
+ DecorateCheckedSupplier<String> dsInstance2 = Decorators.ofCheckedSupplier(() -> {
+ Thread.sleep(1000);
+ return "sleep";
+ });
+
+ GovernanceRequest requestInstance2 = new GovernanceRequest();
+ requestInstance2.setInstanceId("instance02");
+ requestInstance2.setServiceName("service01");
+ requestInstance2.setUri("/test");
+
+ Bulkhead bulkheadInstance2 = instanceBulkheadHandler.getActuator(requestInstance2);
+ dsInstance2.withBulkhead(bulkheadInstance2);
+
+ Executor executor = Executors.newFixedThreadPool(4);
+ AtomicInteger wakeCount = new AtomicInteger(0);
+ AtomicInteger sleepCount = new AtomicInteger(0);
+ AtomicInteger errorCount = new AtomicInteger(0);
+ AtomicInteger rejectCount = new AtomicInteger(0);
+ CountDownLatch countDownLatch = new CountDownLatch(100);
+ for (int i = 0; i < 100; i++) {
+ final int num = i;
+ executor.execute(() -> {
+ // 50% for each server
+ if (num % 2 == 0) {
+ runCommand(dsInstance1, wakeCount, sleepCount, errorCount, rejectCount, countDownLatch);
+ } else {
+ runCommand(dsInstance2, wakeCount, sleepCount, errorCount, rejectCount, countDownLatch);
+ }
+ });
+ }
+ countDownLatch.await(100, TimeUnit.SECONDS);
+ Assertions.assertEquals(50, wakeCount.get());
+ Assertions.assertEquals(2, sleepCount.get());
+ Assertions.assertEquals(0, errorCount.get());
+ Assertions.assertEquals(48, rejectCount.get());
+ }
+
+ private void runCommand(DecorateCheckedSupplier<String> ds, AtomicInteger wakeCount, AtomicInteger sleepCount,
+ AtomicInteger errorCount, AtomicInteger rejectCount, CountDownLatch countDownLatch) {
+ try {
+ String result = ds.get();
+ if ("wake".equals(result)) {
+ wakeCount.incrementAndGet();
+ } else if ("sleep".equals(result)) {
+ sleepCount.incrementAndGet();
+ } else {
+ errorCount.incrementAndGet();
+ }
+ } catch (BulkheadFullException e) {
+ rejectCount.incrementAndGet();
+ } catch (Throwable e) {
+ errorCount.incrementAndGet();
+ }
+ countDownLatch.countDown();
+ }
+}
diff --git a/governance/src/test/resources/application.yaml b/governance/src/test/resources/application.yaml
index e5c0842..5a98ad6 100644
--- a/governance/src/test/resources/application.yaml
+++ b/governance/src/test/resources/application.yaml
@@ -122,6 +122,10 @@
slidingWindowSize: 2
slidingWindowType: COUNT_BASED
waitDurationInOpenState: 1000
+ instanceBulkhead:
+ demo-allOperation: |
+ maxConcurrentCalls: 2
+ maxWaitDuration: 10
faultInjection:
demo-fallback-ThrowException: |
type: abort
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
new file mode 100644
index 0000000..65e4346
--- /dev/null
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceBulkheadHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.handler.governance;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import org.apache.servicecomb.core.Handler;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.governance.MatchType;
+import org.apache.servicecomb.foundation.common.utils.BeanUtils;
+import org.apache.servicecomb.governance.handler.InstanceBulkheadHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.swagger.invocation.AsyncResponse;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.github.resilience4j.bulkhead.Bulkhead;
+import io.github.resilience4j.bulkhead.BulkheadFullException;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
+
+public class ConsumerInstanceBulkheadHandler implements Handler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInstanceBulkheadHandler.class);
+
+ private final InstanceBulkheadHandler instanceBulkheadHandler = BeanUtils.getBean(InstanceBulkheadHandler.class);
+
+ @Override
+ public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
+ if (invocation.getEndpoint() == null) {
+ invocation.next(asyncResp);
+ return;
+ }
+ Supplier<CompletionStage<Response>> next = createBusinessCompletionStageSupplier(invocation);
+ DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
+ GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+ request.setServiceName(invocation.getMicroserviceName());
+ request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId());
+
+ addBulkhead(dcs, request);
+
+ dcs.get().whenComplete((r, e) -> {
+ if (e == null) {
+ asyncResp.complete(r);
+ return;
+ }
+
+ if (e instanceof BulkheadFullException) {
+ // return 503 so that consumer can retry
+ asyncResp.complete(
+ Response.failResp(new InvocationException(503, "bulkhead is full and does not permit further calls.",
+ new CommonExceptionData("bulkhead is full and does not permit further calls."))));
+ LOGGER.warn("bulkhead is full and does not permit further calls by policy : {}", e.getMessage());
+ } else {
+ asyncResp.complete(Response.createProducerFail(e));
+ }
+ });
+ }
+
+ private void addBulkhead(DecorateCompletionStage<Response> dcs, GovernanceRequest request) {
+ Bulkhead bulkhead = instanceBulkheadHandler.getActuator(request);
+ if (bulkhead != null) {
+ dcs.withBulkhead(bulkhead);
+ }
+ }
+
+ private Supplier<CompletionStage<Response>> createBusinessCompletionStageSupplier(Invocation invocation) {
+ return () -> {
+ CompletableFuture<Response> result = new CompletableFuture<>();
+ try {
+ invocation.next(result::complete);
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+ return result;
+ };
+ }
+}
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
new file mode 100644
index 0000000..73d3dbb
--- /dev/null
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.servicecomb.handler.governance;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Supplier;
+
+import org.apache.servicecomb.core.Handler;
+import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.governance.MatchType;
+import org.apache.servicecomb.foundation.common.utils.BeanUtils;
+import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.swagger.invocation.AsyncResponse;
+import org.apache.servicecomb.swagger.invocation.Response;
+import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
+import io.github.resilience4j.circuitbreaker.CircuitBreaker;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
+
+public class ConsumerInstanceIsolationHandler implements Handler {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInstanceIsolationHandler.class);
+
+ private final InstanceIsolationHandler instanceIsolationHandler = BeanUtils.getBean(InstanceIsolationHandler.class);
+
+ @Override
+ public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
+ if (invocation.getEndpoint() == null) {
+ invocation.next(asyncResp);
+ return;
+ }
+ Supplier<CompletionStage<Response>> next = createBusinessCompletionStageSupplier(invocation);
+ DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
+ GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+ request.setServiceName(invocation.getMicroserviceName());
+ request.setInstanceId(invocation.getEndpoint().getMicroserviceInstance().getInstanceId());
+
+ addCircuitBreaker(dcs, request);
+
+ dcs.get().whenComplete((r, e) -> {
+ if (e == null) {
+ asyncResp.complete(r);
+ return;
+ }
+
+ if (e instanceof CallNotPermittedException) {
+ // return 503 so that consumer can retry
+ asyncResp.complete(
+ Response.failResp(new InvocationException(503, "instance isolation circuitBreaker is open.",
+ new CommonExceptionData("instance isolation circuitBreaker is open."))));
+ LOGGER.warn("instance isolation circuitBreaker is open by policy : {}", e.getMessage());
+ } else {
+ asyncResp.complete(Response.createProducerFail(e));
+ }
+ });
+ }
+
+ private void addCircuitBreaker(DecorateCompletionStage<Response> dcs, GovernanceRequest request) {
+ CircuitBreaker circuitBreaker = instanceIsolationHandler.getActuator(request);
+ if (circuitBreaker != null) {
+ dcs.withCircuitBreaker(circuitBreaker);
+ }
+ }
+
+ private Supplier<CompletionStage<Response>> createBusinessCompletionStageSupplier(Invocation invocation) {
+ return () -> {
+ CompletableFuture<Response> result = new CompletableFuture<>();
+ try {
+ invocation.next(result::complete);
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+ return result;
+ };
+ }
+}
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
index 8ae3df8..f3142ab 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
@@ -116,16 +116,7 @@
return () -> {
CompletableFuture<Response> result = new CompletableFuture<>();
try {
- invocation.next(response -> {
- if (response.isFailed()) {
- // For failed response, create a fail to make circuit breaker work.
- // Users application maybe much complicated than this simple logic,
- // while they need to customize which error will cause circuit breaker.
- result.completeExceptionally(response.getResult());
- } else {
- result.complete(response);
- }
- });
+ invocation.next(result::complete);
} catch (Exception e) {
result.completeExceptionally(e);
}
diff --git a/handlers/handler-governance/src/main/resources/config/cse.handler.xml b/handlers/handler-governance/src/main/resources/config/cse.handler.xml
index c7383f0..ab5a82c 100644
--- a/handlers/handler-governance/src/main/resources/config/cse.handler.xml
+++ b/handlers/handler-governance/src/main/resources/config/cse.handler.xml
@@ -18,6 +18,8 @@
<config>
<handler id="governance-provider"
class="org.apache.servicecomb.handler.governance.ProviderGovernanceHandler"/>
- <handler id="governance-consumer"
- class="org.apache.servicecomb.handler.governance.ConsumerGovernanceHandler"/>
+ <handler id="instance-bulkhead-consumer"
+ class="org.apache.servicecomb.handler.governance.ConsumerInstanceBulkheadHandler"/>
+ <handler id="instance-isolation-consumer"
+ class="org.apache.servicecomb.handler.governance.ConsumerInstanceIsolationHandler"/>
</config>