[SCB-2662]trigger pull instance event when instance isolated (#3276)
diff --git a/demo/demo-jaxrs/jaxrs-client/pom.xml b/demo/demo-jaxrs/jaxrs-client/pom.xml
index b1614fc..5acbb0b 100644
--- a/demo/demo-jaxrs/jaxrs-client/pom.xml
+++ b/demo/demo-jaxrs/jaxrs-client/pom.xml
@@ -45,6 +45,10 @@
<groupId>org.apache.servicecomb</groupId>
<artifactId>provider-pojo</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.servicecomb</groupId>
+ <artifactId>handler-governance</artifactId>
+ </dependency>
<!-- can be added in local test -->
<!-- This jar will add an environment version={project.version}, and may cause spring boot application
startup fail -->
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/TestCodeFirstJaxrs.java b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/TestCodeFirstJaxrs.java
index 66d0729..4e6cbd9 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/TestCodeFirstJaxrs.java
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/TestCodeFirstJaxrs.java
@@ -20,6 +20,7 @@
import java.io.File;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.servicecomb.core.provider.consumer.InvokerUtils;
import org.apache.servicecomb.demo.CategorizedTestCase;
@@ -27,6 +28,7 @@
import org.apache.servicecomb.demo.server.User;
import org.apache.servicecomb.foundation.vertx.http.ReadStreamPart;
import org.apache.servicecomb.provider.pojo.RpcReference;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.springframework.stereotype.Component;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -51,6 +53,39 @@
}
@Override
+ public void testHighwayTransport() throws Exception {
+ // test only once
+ testInstanceIsolation();
+ }
+
+ private void testInstanceIsolation() {
+ AtomicInteger e503Business = new AtomicInteger(0);
+ AtomicInteger e503CircuitBreaker = new AtomicInteger(0);
+
+ for (int i = 0; i < 30; i++) {
+ try {
+ InvokerUtils.syncInvoke(SERVICE_NAME, SCHEMA_ID, "instanceIsolationTest", null,
+ String.class);
+ } catch (InvocationException e) {
+ if (e.getStatusCode() == 503) {
+ if ("CommonExceptionData [message=business]".equals(e.getErrorData().toString())) {
+ e503Business.getAndIncrement();
+ } else if ("CommonExceptionData [message=instance isolation circuitBreaker is open.]".equals(
+ e.getErrorData().toString())) {
+ e503CircuitBreaker.getAndIncrement();
+ } else {
+ TestMgr.fail("not expected message");
+ }
+ } else {
+ TestMgr.fail("not expected code");
+ }
+ }
+ }
+ TestMgr.check(true, e503Business.get() >= 10);
+ TestMgr.check(true, e503CircuitBreaker.get() >= 10);
+ }
+
+ @Override
public void testRestTransport() throws Exception {
testDeleteAfterFinished();
}
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
index a0ac2a2..88b099f 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
@@ -25,12 +25,32 @@
handler:
chain:
Consumer:
- default: bizkeeper-consumer,loadbalance
+ default: loadbalance,instance-isolation-consumer,instance-bulkhead-consumer
loadbalance:
strategy:
name: mycustomrule
retryEnabled: true
+ filter:
+ isolation:
+ enabled: false
+
+ matchGroup:
+ instanceIsolationTest: |
+ matches:
+ - apiPath:
+ prefix: "/codeFirstJaxrs/instanceIsolationTest"
+ instanceIsolation:
+ instanceIsolationTest: |
+ minimumNumberOfCalls: 10
+ slidingWindowSize: 20
+ slidingWindowType: COUNT_BASED
+ failureRateThreshold: 50
+ slowCallRateThreshold: 100
+ slowCallDurationThreshold: 3000
+ waitDurationInOpenState: 200
+ permittedNumberOfCallsInHalfOpenState: 10
+
request:
timeout: 30000
jaxrs:
diff --git a/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/CodeFirstJaxrs.java b/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/CodeFirstJaxrs.java
index 3fedafd..81e350f 100644
--- a/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/CodeFirstJaxrs.java
+++ b/demo/demo-jaxrs/jaxrs-server/src/main/java/org/apache/servicecomb/demo/jaxrs/server/CodeFirstJaxrs.java
@@ -58,6 +58,7 @@
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.context.ContextUtils;
import org.apache.servicecomb.swagger.invocation.context.InvocationContext;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
@@ -150,7 +151,8 @@
public String testRawJsonString(String jsonInput) {
Map<String, String> person;
try {
- person = RestObjectMapperFactory.getRestObjectMapper().readValue(jsonInput.getBytes(StandardCharsets.UTF_8), Map.class);
+ person = RestObjectMapperFactory.getRestObjectMapper()
+ .readValue(jsonInput.getBytes(StandardCharsets.UTF_8), Map.class);
} catch (Exception e) {
e.printStackTrace();
return null;
@@ -209,7 +211,8 @@
public String testRawJsonAnnotation(@RawJsonRequestBody String jsonInput) {
Map<String, String> person;
try {
- person = RestObjectMapperFactory.getRestObjectMapper().readValue(jsonInput.getBytes(StandardCharsets.UTF_8), Map.class);
+ person = RestObjectMapperFactory.getRestObjectMapper()
+ .readValue(jsonInput.getBytes(StandardCharsets.UTF_8), Map.class);
} catch (Exception e) {
e.printStackTrace();
return null;
@@ -275,6 +278,12 @@
.setSubmittedFileName(name);
}
+ @Path("/instanceIsolationTest")
+ @GET
+ public String instanceIsolationTest() {
+ throw new InvocationException(503, "", "business");
+ }
+
private File createTempFile(String name, String content) throws IOException {
File systemTempFile = new File(System.getProperty("java.io.tmpdir"));
File file = new File(systemTempFile, name);
diff --git a/demo/demo-spring-boot-provider/demo-spring-boot-jaxrs-client/src/main/resources/application.yml b/demo/demo-spring-boot-provider/demo-spring-boot-jaxrs-client/src/main/resources/application.yml
index f0dc3cd..8631c2a 100644
--- a/demo/demo-spring-boot-provider/demo-spring-boot-jaxrs-client/src/main/resources/application.yml
+++ b/demo/demo-spring-boot-provider/demo-spring-boot-jaxrs-client/src/main/resources/application.yml
@@ -32,4 +32,4 @@
handler:
chain:
Consumer:
- default: bizkeeper-consumer,loadbalance
+ default: loadbalance,instance-isolation-consumer,instance-bulkhead-consumer
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/event/ServiceCenterEventBus.java b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/api/event/ServiceCenterEventBus.java
similarity index 94%
rename from service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/event/ServiceCenterEventBus.java
rename to foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/api/event/ServiceCenterEventBus.java
index 316cc32..204ee72 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/event/ServiceCenterEventBus.java
+++ b/foundations/foundation-registry/src/main/java/org/apache/servicecomb/registry/api/event/ServiceCenterEventBus.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.serviceregistry.event;
+package org.apache.servicecomb.registry.api.event;
import org.apache.servicecomb.foundation.common.event.SimpleEventBus;
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
index 73d3dbb..c6809de 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerInstanceIsolationHandler.java
@@ -27,6 +27,9 @@
import org.apache.servicecomb.foundation.common.utils.BeanUtils;
import org.apache.servicecomb.governance.handler.InstanceIsolationHandler;
import org.apache.servicecomb.governance.marker.GovernanceRequest;
+import org.apache.servicecomb.registry.api.MicroserviceKey;
+import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.exception.CommonExceptionData;
@@ -65,17 +68,27 @@
}
if (e instanceof CallNotPermittedException) {
+ LOGGER.warn("instance isolation circuitBreaker is open by policy : {}", e.getMessage());
+ ServiceCenterEventBus.getEventBus().post(createMicroserviceInstanceChangedEvent(invocation));
// return 503 so that consumer can retry
asyncResp.complete(
Response.failResp(new InvocationException(503, "instance isolation circuitBreaker is open.",
new CommonExceptionData("instance isolation circuitBreaker is open."))));
- LOGGER.warn("instance isolation circuitBreaker is open by policy : {}", e.getMessage());
} else {
asyncResp.complete(Response.createProducerFail(e));
}
});
}
+ private Object createMicroserviceInstanceChangedEvent(Invocation invocation) {
+ MicroserviceInstanceChangedEvent event = new MicroserviceInstanceChangedEvent();
+ MicroserviceKey key = new MicroserviceKey();
+ key.setAppId(invocation.getAppId());
+ key.setServiceName(invocation.getMicroserviceName());
+ event.setKey(key);
+ return event;
+ }
+
private void addCircuitBreaker(DecorateCompletionStage<Response> dcs, GovernanceRequest request) {
CircuitBreaker circuitBreaker = instanceIsolationHandler.getActuator(request);
if (circuitBreaker != null) {
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java
index e1bb76c..dbb31c7 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/auth/TokenCacheManager.java
@@ -33,7 +33,7 @@
import org.apache.servicecomb.service.center.client.model.RbacTokenRequest;
import org.apache.servicecomb.service.center.client.model.RbacTokenResponse;
import org.apache.servicecomb.serviceregistry.event.NotPermittedEvent;
-import org.apache.servicecomb.serviceregistry.event.ServiceCenterEventBus;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index 32effa0..9c5ed1d 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -63,7 +63,7 @@
import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import org.apache.servicecomb.serviceregistry.event.NotPermittedEvent;
-import org.apache.servicecomb.serviceregistry.event.ServiceCenterEventBus;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
import org.apache.servicecomb.serviceregistry.task.HeartbeatResult;
import org.apache.servicecomb.serviceregistry.task.MicroserviceInstanceHeartbeatTask;
import org.slf4j.Logger;
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ClassificationAddress.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ClassificationAddress.java
index b81e070..46da313 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ClassificationAddress.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/refresh/ClassificationAddress.java
@@ -43,7 +43,7 @@
import org.apache.servicecomb.registry.cache.InstanceCacheManager;
import org.apache.servicecomb.registry.definition.DefinitionConst;
import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
-import org.apache.servicecomb.serviceregistry.event.ServiceCenterEventBus;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheRefreshedEvent;
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index 6896ed5..cf32dde 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -19,13 +19,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.Configuration;
import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
import org.apache.servicecomb.registry.DiscoveryManager;
import org.apache.servicecomb.registry.api.event.MicroserviceInstanceChangedEvent;
-import org.apache.servicecomb.serviceregistry.event.ShutdownEvent;
import org.apache.servicecomb.registry.api.registry.BasePath;
import org.apache.servicecomb.registry.api.registry.Microservice;
import org.apache.servicecomb.registry.api.registry.MicroserviceFactory;
@@ -36,11 +38,11 @@
import org.apache.servicecomb.serviceregistry.api.Const;
import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient;
import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
+import org.apache.servicecomb.serviceregistry.event.ShutdownEvent;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey;
import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheRefreshedEvent;
import org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache;
-import org.apache.servicecomb.serviceregistry.registry.cache.ServiceRegistryCache;
import org.apache.servicecomb.serviceregistry.task.MicroserviceServiceCenterTask;
import org.apache.servicecomb.serviceregistry.task.ServiceCenterTask;
import org.slf4j.Logger;
@@ -48,7 +50,6 @@
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
-import com.google.common.util.concurrent.MoreExecutors;
public abstract class AbstractServiceRegistry implements ServiceRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServiceRegistry.class);
@@ -65,12 +66,15 @@
protected ServiceCenterTask serviceCenterTask;
- protected ExecutorService executorService = MoreExecutors.newDirectExecutorService();
+ private final ExecutorService executorService = new ThreadPoolExecutor(1, 1,
+ 0L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(5),
+ r -> new Thread(r, "instance-changed-event-task"));
+
+ protected RefreshableServiceRegistryCache serviceRegistryCache;
private String name;
- RefreshableServiceRegistryCache serviceRegistryCache;
-
public AbstractServiceRegistry(EventBus eventBus, ServiceRegistryConfig serviceRegistryConfig,
Configuration configuration) {
setName(serviceRegistryConfig.getRegistryName());
@@ -257,23 +261,26 @@
this.name = name;
}
- public ServiceRegistryCache getServiceRegistryCache() {
- return serviceRegistryCache;
- }
-
@Subscribe
public void onShutdown(ShutdownEvent event) {
+ shutdownEventHandler(event);
+ }
+
+ protected void shutdownEventHandler(ShutdownEvent event) {
LOGGER.info("service center task is shutdown.");
executorService.shutdownNow();
}
- // post from watch eventloop, should refresh the exact microservice instances immediately
@Subscribe
public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) {
- executorService.execute(new SuppressedRunnableWrapper(
- () -> {
- serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
- DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstanceChanged(changedEvent);
- }));
+ try {
+ executorService.execute(new SuppressedRunnableWrapper(
+ () -> {
+ serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent);
+ DiscoveryManager.INSTANCE.getAppManager().onMicroserviceInstanceChanged(changedEvent);
+ }));
+ } catch (Exception e) {
+ LOGGER.info("instance changed event ignored, {}", e.getMessage());
+ }
}
}
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
index f039736..2af08c3 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java
@@ -30,6 +30,7 @@
import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
import org.apache.servicecomb.serviceregistry.event.HeartbeatFailEvent;
import org.apache.servicecomb.serviceregistry.event.HeartbeatSuccEvent;
+import org.apache.servicecomb.serviceregistry.event.ShutdownEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +70,6 @@
},
(task, executor) -> LOGGER.warn("Too many pending tasks, reject " + task.toString())
);
- executorService = taskPool;
}
@Override
@@ -107,6 +107,12 @@
return this.taskPool;
}
+ @Override
+ protected void shutdownEventHandler(ShutdownEvent event) {
+ super.shutdownEventHandler(event);
+ this.taskPool.shutdownNow();
+ }
+
@Subscribe
public void onHeartbeatSuccEvent(HeartbeatSuccEvent heartbeatSuccEvent) {
// 可以考虑多等待一个心跳周期,这样的好处是尽可能避免provider滞后于consumer注册的情况,consumer访问provider失败
diff --git a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/ServiceRegistryFactory.java b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/ServiceRegistryFactory.java
index 966f414..9f7aedf 100644
--- a/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/ServiceRegistryFactory.java
+++ b/service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/registry/ServiceRegistryFactory.java
@@ -21,7 +21,7 @@
import org.apache.servicecomb.foundation.common.event.SimpleEventBus;
import org.apache.servicecomb.serviceregistry.ServiceRegistry;
import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig;
-import org.apache.servicecomb.serviceregistry.event.ServiceCenterEventBus;
+import org.apache.servicecomb.registry.api.event.ServiceCenterEventBus;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
diff --git a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
index dbcf6be..977a5e2 100644
--- a/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
+++ b/service-registry/registry-service-center/src/test/java/org/apache/servicecomb/serviceregistry/TestConsumers.java
@@ -34,10 +34,10 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
import mockit.Mock;
import mockit.MockUp;
-import org.junit.jupiter.api.Assertions;
public class TestConsumers extends TestRegistryBase {
@Before
@@ -89,6 +89,10 @@
key.setAppId(appId);
key.setServiceName(serviceName);
eventBus.post(event);
+ long begin = System.currentTimeMillis();
+ while (microserviceManager.getVersionsByName().size() > 0 && System.currentTimeMillis() - begin < 1000) {
+ Thread.yield();
+ }
Assertions.assertEquals(0, microserviceManager.getVersionsByName().size());
}