[SCB-2373]refactor retry implementation to work in invokers
diff --git a/core/pom.xml b/core/pom.xml
index 5b65327..c45bc47 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -52,6 +52,10 @@
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.servicecomb</groupId>
+ <artifactId>servicecomb-governance</artifactId>
+ </dependency>
<dependency>
<groupId>org.jmockit</groupId>
diff --git a/core/src/main/java/org/apache/servicecomb/core/governance/GovernanceConfiguration.java b/core/src/main/java/org/apache/servicecomb/core/governance/GovernanceConfiguration.java
new file mode 100644
index 0000000..eff944f
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/governance/GovernanceConfiguration.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.governance;
+
+import com.netflix.config.DynamicPropertyFactory;
+
+public class GovernanceConfiguration {
+ public static final String ROOT = "servicecomb.loadbalance.";
+
+ // retry configurations
+ public static final String RETRY_ENABLED = "retryEnabled";
+
+ public static final String RETRY_ON_NEXT = "retryOnNext";
+
+ public static final String RETRY_ON_SAME = "retryOnSame";
+
+ public static boolean isRetryEnabled(String microservice) {
+ String p = getStringProperty("false",
+ ROOT + microservice + "." + RETRY_ENABLED,
+ ROOT + RETRY_ENABLED);
+ return Boolean.parseBoolean(p);
+ }
+
+ public static int getRetryNextServer(String microservice) {
+ return getRetryServer(microservice, RETRY_ON_NEXT);
+ }
+
+ public static int getRetrySameServer(String microservice) {
+ return getRetryServer(microservice, RETRY_ON_SAME);
+ }
+
+ private static int getRetryServer(String microservice, String retryType) {
+ final int defaultValue = 0;
+ String p = getStringProperty("0",
+ ROOT + microservice + "." + retryType,
+ ROOT + retryType);
+ try {
+ int result = Integer.parseInt(p);
+ if (result > 0) {
+ return result;
+ }
+ return defaultValue;
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ public static String getStringProperty(String defaultValue, String... keys) {
+ String property;
+ for (String key : keys) {
+ property = DynamicPropertyFactory.getInstance().getStringProperty(key, null).get();
+ if (property != null) {
+ return property;
+ }
+ }
+ return defaultValue;
+ }
+}
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/MatchType.java b/core/src/main/java/org/apache/servicecomb/core/governance/MatchType.java
similarity index 97%
rename from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/MatchType.java
rename to core/src/main/java/org/apache/servicecomb/core/governance/MatchType.java
index f9f08bd..d680b55 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/MatchType.java
+++ b/core/src/main/java/org/apache/servicecomb/core/governance/MatchType.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.core.governance;
import java.util.Enumeration;
import java.util.HashMap;
diff --git a/core/src/main/java/org/apache/servicecomb/core/governance/RetryContext.java b/core/src/main/java/org/apache/servicecomb/core/governance/RetryContext.java
new file mode 100644
index 0000000..6fa4d7d
--- /dev/null
+++ b/core/src/main/java/org/apache/servicecomb/core/governance/RetryContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.core.governance;
+
+public class RetryContext {
+ public static final String RETRY_CONTEXT = "x-servicecomb-retry";
+
+ private boolean retry;
+
+ private int triedCount;
+
+ private int retryOnSame;
+
+ public RetryContext(int retryOnSame) {
+ this.retryOnSame = retryOnSame;
+ this.retry = false;
+ this.triedCount = 0;
+ }
+
+ public boolean isRetry() {
+ return retry;
+ }
+
+ public void incrementRetry() {
+ this.retry = true;
+ this.triedCount++;
+ }
+
+ public boolean trySameServer() {
+ return triedCount <= retryOnSame;
+ }
+}
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombConfigurationEventAdapter.java b/core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombConfigurationEventAdapter.java
similarity index 97%
rename from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombConfigurationEventAdapter.java
rename to core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombConfigurationEventAdapter.java
index 58e41b8..3d72a86 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombConfigurationEventAdapter.java
+++ b/core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombConfigurationEventAdapter.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.core.governance;
import java.util.HashSet;
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombInvocationContext.java b/core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombInvocationContext.java
similarity index 97%
rename from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombInvocationContext.java
rename to core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombInvocationContext.java
index fdedd0a..667737e 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombInvocationContext.java
+++ b/core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombInvocationContext.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.core.governance;
import java.util.Collections;
import java.util.HashMap;
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombMicroserviceMeta.java b/core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombMicroserviceMeta.java
similarity index 95%
rename from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombMicroserviceMeta.java
rename to core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombMicroserviceMeta.java
index 62a9b54..52074d0 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombMicroserviceMeta.java
+++ b/core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombMicroserviceMeta.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.core.governance;
import org.apache.servicecomb.governance.MicroserviceMeta;
import org.apache.servicecomb.registry.RegistrationManager;
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombRetryExtension.java b/core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombRetryExtension.java
similarity index 97%
rename from handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombRetryExtension.java
rename to core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombRetryExtension.java
index e97cdc5..cb90602 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ServiceCombRetryExtension.java
+++ b/core/src/main/java/org/apache/servicecomb/core/governance/ServiceCombRetryExtension.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.servicecomb.handler.governance;
+package org.apache.servicecomb.core.governance;
import io.vertx.core.VertxException;
diff --git a/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java b/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
index 640bf44..feb8760 100644
--- a/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
+++ b/core/src/main/java/org/apache/servicecomb/core/provider/consumer/InvokerUtils.java
@@ -19,10 +19,24 @@
import static org.apache.servicecomb.core.exception.Exceptions.toConsumerResponse;
+import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import javax.annotation.Nonnull;
@@ -32,8 +46,15 @@
import org.apache.servicecomb.core.definition.MicroserviceMeta;
import org.apache.servicecomb.core.definition.OperationMeta;
import org.apache.servicecomb.core.definition.SchemaMeta;
+import org.apache.servicecomb.core.governance.GovernanceConfiguration;
+import org.apache.servicecomb.core.governance.MatchType;
+import org.apache.servicecomb.core.governance.RetryContext;
+import org.apache.servicecomb.core.governance.ServiceCombInvocationContext;
import org.apache.servicecomb.core.invocation.InvocationFactory;
import org.apache.servicecomb.foundation.common.utils.AsyncUtils;
+import org.apache.servicecomb.foundation.common.utils.BeanUtils;
+import org.apache.servicecomb.governance.handler.RetryHandler;
+import org.apache.servicecomb.governance.marker.GovernanceRequest;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
import org.apache.servicecomb.swagger.invocation.Response;
import org.apache.servicecomb.swagger.invocation.context.ContextUtils;
@@ -42,14 +63,60 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableMap;
import com.netflix.config.DynamicPropertyFactory;
+import io.github.resilience4j.decorators.Decorators;
+import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
+import io.github.resilience4j.retry.Retry;
+import io.github.resilience4j.retry.RetryConfig;
+import io.github.resilience4j.retry.RetryRegistry;
+import io.vavr.CheckedFunction0;
+import io.vavr.control.Try;
import io.vertx.core.Context;
+import io.vertx.core.VertxException;
public final class InvokerUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(InvokerUtils.class);
- private static boolean enableEventLoopBlockingCallCheck =
+ private static final Map<Class<? extends Throwable>, List<String>> STRICT_RETRIABLE =
+ ImmutableMap.<Class<? extends Throwable>, List<String>>builder()
+ .put(ConnectException.class, Collections.emptyList())
+ .put(SocketTimeoutException.class, Collections.emptyList())
+ /*
+ * deal with some special exceptions caused by the server side close the connection
+ */
+ .put(IOException.class, Collections.singletonList("Connection reset by peer"))
+ .put(VertxException.class, Collections.singletonList("Connection was closed"))
+ .put(NoRouteToHostException.class, Collections.emptyList())
+ .build();
+
+ private static final Object LOCK = new Object();
+
+ private static ScheduledExecutorService reactiveRetryPool;
+
+ private static ScheduledExecutorService getOrCreateRetryPool() {
+ if (reactiveRetryPool == null) {
+ synchronized (LOCK) {
+ if (reactiveRetryPool == null) {
+ reactiveRetryPool = Executors.newScheduledThreadPool(2, new ThreadFactory() {
+ private final AtomicInteger count = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, "reactive-retry-pool-thread-" + count.getAndIncrement());
+ // avoid block shutdown
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ }
+ }
+ }
+ return reactiveRetryPool;
+ }
+
+ private static final boolean ENABLE_EVENT_LOOP_BLOCKING_CALL_CHECK =
DynamicPropertyFactory.getInstance()
.getBooleanProperty("servicecomb.invocation.enableEventLoopBlockingCallCheck", true).get();
@@ -93,9 +160,8 @@
ReferenceConfig referenceConfig = microserviceReferenceConfig.createReferenceConfig(transport, operationMeta);
InvocationRuntimeType invocationRuntimeType = operationMeta.buildBaseConsumerRuntimeType();
invocationRuntimeType.setSuccessResponseType(responseType);
- Invocation invocation = InvocationFactory
+ return InvocationFactory
.forConsumer(referenceConfig, operationMeta, invocationRuntimeType, swaggerArguments);
- return invocation;
}
/**
@@ -125,9 +191,6 @@
/**
* This is an internal API, caller make sure already invoked SCBEngine.ensureStatusUp
- * @param invocation
- * @return contract result
- * @throws InvocationException
*/
public static Object syncInvoke(Invocation invocation) throws InvocationException {
Response response = innerSyncInvoke(invocation);
@@ -143,15 +206,26 @@
/**
* This is an internal API, caller make sure already invoked SCBEngine.ensureStatusUp
- * @param invocation
- * @return servicecomb response object
*/
public static Response innerSyncInvoke(Invocation invocation) {
+ invocation.onStart(null, System.nanoTime());
+
+ GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+
try {
- if (enableEventLoopBlockingCallCheck && isInEventLoop()) {
+ ServiceCombInvocationContext.setInvocationContext(invocation);
+ return decorateSyncRetry(invocation, request);
+ } finally {
+ ServiceCombInvocationContext.removeInvocationContext();
+ }
+ }
+
+ private static Response innerSyncInvokeImpl(Invocation invocation) {
+ try {
+ if (ENABLE_EVENT_LOOP_BLOCKING_CALL_CHECK && isInEventLoop()) {
throw new IllegalStateException("Can not execute sync logic in event loop. ");
}
- invocation.onStart(null, System.nanoTime());
+ updateRetryStatus(invocation);
SyncResponseExecutor respExecutor = new SyncResponseExecutor();
invocation.setResponseExecutor(respExecutor);
@@ -173,38 +247,132 @@
}
}
+ private static void updateRetryStatus(Invocation invocation) {
+ if (invocation.getHandlerIndex() != 0) {
+ // for retry, reset index
+ invocation.setHandlerIndex(0);
+ RetryContext retryContext = invocation.getLocalContext(RetryContext.RETRY_CONTEXT);
+ retryContext.incrementRetry();
+ return;
+ }
+
+ invocation.addLocalContext(RetryContext.RETRY_CONTEXT,
+ new RetryContext(GovernanceConfiguration.getRetrySameServer(invocation.getMicroserviceName())));
+ }
+
+ private static Response decorateSyncRetry(Invocation invocation, GovernanceRequest request) {
+ // governance implementations.
+ RetryHandler retryHandler = BeanUtils.getBean(RetryHandler.class);
+ Retry retry = retryHandler.getActuator(request);
+ if (retry != null) {
+ CheckedFunction0<Response> supplier = Retry.decorateCheckedSupplier(retry, () -> innerSyncInvokeImpl(invocation));
+ return Try.of(supplier).get();
+ }
+
+ if (isCompatibleRetryEnabled(invocation)) {
+ // compatible implementation for retry in load balance module in old versions.
+ retry = getOrCreateCompatibleRetry(invocation);
+ CheckedFunction0<Response> supplier = Retry.decorateCheckedSupplier(retry, () -> innerSyncInvokeImpl(invocation));
+ return Try.of(supplier).get();
+ }
+
+ // retry not enabled
+ return innerSyncInvokeImpl(invocation);
+ }
+
+ private static boolean isCompatibleRetryEnabled(Invocation invocation) {
+ // maxAttempts must be greater than or equal to 1
+ return GovernanceConfiguration.isRetryEnabled(invocation.getMicroserviceName())
+ && GovernanceConfiguration.getRetryNextServer(invocation.getMicroserviceName())
+ + GovernanceConfiguration.getRetrySameServer(invocation.getMicroserviceName()) > 0;
+ }
+
+ private static Retry getOrCreateCompatibleRetry(Invocation invocation) {
+ RetryConfig retryConfig = RetryConfig.custom()
+ // max attempts include the first call
+ .maxAttempts(GovernanceConfiguration.getRetryNextServer(invocation.getMicroserviceName())
+ + GovernanceConfiguration.getRetrySameServer(invocation.getMicroserviceName()) + 1)
+ .retryOnResult(InvokerUtils::canRetryForStatusCode)
+ .retryOnException(InvokerUtils::canRetryForException)
+ .waitDuration(Duration.ofMillis(0))
+ .build();
+ RetryRegistry retryRegistry = RetryRegistry.of(retryConfig);
+ return retryRegistry.retry(invocation.getMicroserviceName());
+ }
+
/**
* This is an internal API, caller make sure already invoked SCBEngine.ensureStatusUp
- * @param invocation
- * @param asyncResp
*/
public static void reactiveInvoke(Invocation invocation, AsyncResponse asyncResp) {
+ invocation.onStart(null, System.nanoTime());
+ invocation.setSync(false);
+
+ Supplier<CompletionStage<Response>> next = reactiveInvokeImpl(invocation);
+ DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
+ GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+
try {
- invocation.onStart(null, System.nanoTime());
- invocation.setSync(false);
-
- ReactiveResponseExecutor respExecutor = new ReactiveResponseExecutor();
- invocation.setResponseExecutor(respExecutor);
-
- invocation.onStartHandlersRequest();
- invocation.next(ar -> {
- ContextUtils.setInvocationContext(invocation.getParentContext());
- try {
- invocation.getInvocationStageTrace().finishHandlersResponse();
- invocation.onFinish(ar);
- asyncResp.handle(ar);
- } finally {
- ContextUtils.removeInvocationContext();
- }
- });
- } catch (Throwable e) {
- invocation.getInvocationStageTrace().finishHandlersResponse();
- //if throw exception,we can use 500 for status code ?
- Response response = Response.createConsumerFail(e);
- invocation.onFinish(response);
- LOGGER.error("invoke failed, {}", invocation.getOperationMeta().getMicroserviceQualifiedName());
- asyncResp.handle(response);
+ ServiceCombInvocationContext.setInvocationContext(invocation);
+ decorateReactiveRetry(invocation, dcs, request);
+ } finally {
+ ServiceCombInvocationContext.removeInvocationContext();
}
+
+ dcs.get().whenComplete((r, e) -> {
+ if (e == null) {
+ asyncResp.complete(r);
+ return;
+ }
+
+ asyncResp.consumerFail(e);
+ });
+ }
+
+ private static void decorateReactiveRetry(Invocation invocation, DecorateCompletionStage<Response> dcs,
+ GovernanceRequest request) {
+ // governance implementations.
+ RetryHandler retryHandler = BeanUtils.getBean(RetryHandler.class);
+ Retry retry = retryHandler.getActuator(request);
+ if (retry != null) {
+ dcs.withRetry(retry, getOrCreateRetryPool());
+ }
+
+ if (isCompatibleRetryEnabled(invocation)) {
+ // compatible implementation for retry in load balance module in old versions.
+ retry = getOrCreateCompatibleRetry(invocation);
+ dcs.withRetry(retry, getOrCreateRetryPool());
+ }
+ }
+
+ private static Supplier<CompletionStage<Response>> reactiveInvokeImpl(Invocation invocation) {
+ return () -> {
+ CompletableFuture<Response> result = new CompletableFuture<>();
+ try {
+ updateRetryStatus(invocation);
+
+ ReactiveResponseExecutor respExecutor = new ReactiveResponseExecutor();
+ invocation.setResponseExecutor(respExecutor);
+ invocation.onStartHandlersRequest();
+ invocation.next(ar -> {
+ ContextUtils.setInvocationContext(invocation.getParentContext());
+ try {
+ invocation.getInvocationStageTrace().finishHandlersResponse();
+ invocation.onFinish(ar);
+ result.complete(ar);
+ } finally {
+ ContextUtils.removeInvocationContext();
+ }
+ });
+ } catch (Throwable e) {
+ invocation.getInvocationStageTrace().finishHandlersResponse();
+ //if throw exception,we can use 500 for status code ?
+ Response response = Response.createConsumerFail(e);
+ invocation.onFinish(response);
+ LOGGER.error("invoke failed, {}", invocation.getOperationMeta().getMicroserviceQualifiedName());
+ result.complete(response);
+ }
+ return result;
+ };
}
public static boolean isSyncMethod(@Nonnull Method method) {
@@ -212,23 +380,47 @@
}
public static boolean isAsyncMethod(@Nonnull Method method) {
- // todo: should be extendable to support other reactive return type, eg: rxJava / project-reactor
+ // currently only support CompletableFuture for async method definition
return method.getReturnType().equals(CompletableFuture.class);
}
/**
- * should never throw exception directly
- *
- * @param invocation invocation
- * @return CompletableFuture<Response>
+ * This method is used in new Filter implementation to replace Handler
+ * NOTE: this method should never throw exception directly
*/
public static CompletableFuture<Response> invoke(Invocation invocation) {
+ Supplier<CompletionStage<Response>> next = invokeImpl(invocation);
+ DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
+ GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
+
+ try {
+ ServiceCombInvocationContext.setInvocationContext(invocation);
+ decorateReactiveRetry(invocation, dcs, request);
+ } finally {
+ ServiceCombInvocationContext.removeInvocationContext();
+ }
+
+ CompletableFuture<Response> result = new CompletableFuture<>();
+ dcs.get().whenComplete((r, e) -> {
+ if (e != null) {
+ result.completeExceptionally(e);
+ } else {
+ result.complete(r);
+ }
+ });
+ return result;
+ }
+
+ private static Supplier<CompletionStage<Response>> invokeImpl(Invocation invocation) {
invocation.onStart(null, System.nanoTime());
- invocation.onStartHandlersRequest();
- return invocation.getMicroserviceMeta().getFilterChain()
- .onFilter(invocation)
- .exceptionally(throwable -> toConsumerResponse(invocation, throwable))
- .whenComplete((response, throwable) -> finishInvocation(invocation, response));
+ return () -> {
+ updateRetryStatus(invocation);
+ invocation.onStartHandlersRequest();
+ return invocation.getMicroserviceMeta().getFilterChain()
+ .onFilter(invocation)
+ .exceptionally(throwable -> toConsumerResponse(invocation, throwable))
+ .whenComplete((response, throwable) -> finishInvocation(invocation, response));
+ };
}
private static void finishInvocation(Invocation invocation, Response response) {
@@ -243,4 +435,45 @@
invocation.getInvocationStageTrace().finishHandlersResponse();
invocation.onFinish(response);
}
+
+ private static boolean canRetryForStatusCode(Object response) {
+ // retry on status code 503
+ if (!(response instanceof Response)) {
+ return false;
+ }
+ Response resp = (Response) response;
+ if (!resp.isFailed()) {
+ return false;
+ }
+ if (resp.getResult() instanceof InvocationException) {
+ InvocationException e = resp.getResult();
+ return e.getStatusCode() == 503;
+ }
+ return false;
+ }
+
+ private static boolean canRetryForException(Throwable throwableToSearchIn) {
+ // retry on exception type on message match
+ int infiniteLoopPreventionCounter = 10;
+ while (throwableToSearchIn != null && infiniteLoopPreventionCounter > 0) {
+ infiniteLoopPreventionCounter--;
+ for (Entry<Class<? extends Throwable>, List<String>> c : STRICT_RETRIABLE.entrySet()) {
+ Class<? extends Throwable> key = c.getKey();
+ if (key.isAssignableFrom(throwableToSearchIn.getClass())) {
+ if (c.getValue() == null || c.getValue().isEmpty()) {
+ return true;
+ } else {
+ String msg = throwableToSearchIn.getMessage();
+ for (String val : c.getValue()) {
+ if (val.equals(msg)) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ throwableToSearchIn = throwableToSearchIn.getCause();
+ }
+ return false;
+ }
}
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/CustomLoadbalanceExtensionsFactory.java b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/CustomLoadbalanceExtensionsFactory.java
index d698612..c7c545b 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/CustomLoadbalanceExtensionsFactory.java
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/java/org/apache/servicecomb/demo/jaxrs/client/CustomLoadbalanceExtensionsFactory.java
@@ -23,7 +23,6 @@
import org.springframework.stereotype.Component;
import com.netflix.client.DefaultLoadBalancerRetryHandler;
-import com.netflix.client.RetryHandler;
import com.netflix.client.Utils;
@Component
@@ -50,18 +49,11 @@
@Override
public boolean isSupport(String key, String value) {
return (Configuration.RULE_STRATEGY_NAME.equals(key) &&
- "mycustomrule".equals(value))
- || (Configuration.RETRY_HANDLER.equals(key) &&
- "mycustomhandler".equals(value));
+ "mycustomrule".equals(value));
}
@Override
public RuleExt createLoadBalancerRule(String ruleName) {
return new MyCustomRule();
}
-
- @Override
- public RetryHandler createRetryHandler(String retryName, String microservice) {
- return new MyCustomHandler(1, 1, true);
- }
}
diff --git a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
index 54b9929..a0ac2a2 100644
--- a/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
+++ b/demo/demo-jaxrs/jaxrs-client/src/main/resources/microservice.yaml
@@ -30,7 +30,6 @@
strategy:
name: mycustomrule
retryEnabled: true
- retryHandler: mycustomhandler
request:
timeout: 30000
diff --git a/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/validator/Teacher.java b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/validator/Teacher.java
index c1fc63e..009a250 100644
--- a/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/validator/Teacher.java
+++ b/demo/demo-schema/src/main/java/org/apache/servicecomb/demo/validator/Teacher.java
@@ -21,7 +21,7 @@
public class Teacher {
- @NotBlank
+ @NotBlank(message = "must not be blank")
private String name;
private String age;
diff --git a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
index 991016a..f22675f 100644
--- a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
+++ b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/CodeFirstRestTemplateSpringmvc.java
@@ -237,6 +237,8 @@
int retryResult = template.getForObject(cseUrlPrefix + "retrySuccess?a=2&b=3", Integer.class);
TestMgr.check(retryResult, 5);
+ retryResult = template.getForObject(cseUrlPrefix + "retrySuccess?a=2&b=3", Integer.class);
+ TestMgr.check(retryResult, 5);
}
private void testCodeFirstTestForm(RestTemplate template, String cseUrlPrefix) {
diff --git a/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestRetrySchema.java b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestRetrySchema.java
new file mode 100644
index 0000000..e103d70
--- /dev/null
+++ b/demo/demo-springmvc/springmvc-client/src/main/java/org/apache/servicecomb/demo/springmvc/client/TestRetrySchema.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.demo.springmvc.client;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.servicecomb.demo.CategorizedTestCase;
+import org.apache.servicecomb.demo.TestMgr;
+import org.apache.servicecomb.provider.pojo.RpcReference;
+import org.apache.servicecomb.provider.springmvc.reference.RestTemplateBuilder;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+
+@Component
+public class TestRetrySchema implements CategorizedTestCase {
+ interface RetrySchemaInf {
+ boolean successWhenRetry();
+
+ CompletableFuture<Boolean> successWhenRetryAsync();
+ }
+
+ @RpcReference(microserviceName = "springmvc", schemaId = "RetrySchema")
+ private RetrySchemaInf retrySchemaInf;
+
+ RestTemplate restTemplate = RestTemplateBuilder.create();
+
+ final String server = "servicecomb://springmvc";
+
+ @Override
+ public void testAllTransport() throws Exception {
+ testRetryGovernanceRestTemplate();
+ testRetryGovernanceRpc();
+ }
+
+ private void testRetryGovernanceRpc() throws Exception {
+ TestMgr.check(retrySchemaInf.successWhenRetry(), true);
+ TestMgr.check(retrySchemaInf.successWhenRetry(), true);
+
+ TestMgr.check(retrySchemaInf.successWhenRetryAsync().get(), true);
+ TestMgr.check(retrySchemaInf.successWhenRetryAsync().get(), true);
+ }
+
+ private void testRetryGovernanceRestTemplate() {
+ TestMgr.check(restTemplate.getForObject(server + "/retry/governance/successWhenRetry", boolean.class), true);
+ TestMgr.check(restTemplate.getForObject(server + "/retry/governance/successWhenRetry", boolean.class), true);
+ }
+}
diff --git a/demo/demo-springmvc/springmvc-client/src/main/resources/microservice.yaml b/demo/demo-springmvc/springmvc-client/src/main/resources/microservice.yaml
index bc5e1c0..550192b 100644
--- a/demo/demo-springmvc/springmvc-client/src/main/resources/microservice.yaml
+++ b/demo/demo-springmvc/springmvc-client/src/main/resources/microservice.yaml
@@ -118,6 +118,17 @@
test:
duplicate1: newer
+# test governance retry
+ matchGroup:
+ retry-governance: |
+ matches:
+ - apiPath:
+ prefix: "/retry/governance/"
+ retry:
+ retry-governance: |
+ maxAttempts: 2
+ retryOnResponseStatus: [500]
+
cse:
fallback:
Consumer:
diff --git a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
index bbd1b17..2633df0 100644
--- a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
+++ b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/CodeFirstSpringmvc.java
@@ -93,7 +93,7 @@
public class CodeFirstSpringmvc {
private static final Logger LOGGER = LoggerFactory.getLogger(CodeFirstSpringmvc.class);
- private AtomicInteger firstInovcation = new AtomicInteger(2);
+ private AtomicInteger invocationCounter = new AtomicInteger(0);
private String _fileUpload(MultipartFile file1, Part file2) {
try (InputStream is1 = file1.getInputStream(); InputStream is2 = file2.getInputStream()) {
@@ -114,7 +114,7 @@
@GetMapping(path = "/retrySuccess")
public int retrySuccess(@RequestParam("a") int a, @RequestParam("b") int b) {
- if (firstInovcation.getAndDecrement() > 0) {
+ if (invocationCounter.getAndIncrement() % 3 != 0) {
throw new InvocationException(Status.SERVICE_UNAVAILABLE, "try again later.");
}
return a + b;
diff --git a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ProducerTestsAfterBootup.java b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ProducerTestsAfterBootup.java
index 16ddd92..d7c13cb 100644
--- a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ProducerTestsAfterBootup.java
+++ b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/ProducerTestsAfterBootup.java
@@ -91,9 +91,9 @@
public void testRegisteredBasePath() {
if (DynamicPropertyFactory.getInstance().getBooleanProperty("servicecomb.test.vert.transport", true).get()) {
- TestMgr.check(20, RegistrationManager.INSTANCE.getMicroservice().getPaths().size());
- } else {
TestMgr.check(21, RegistrationManager.INSTANCE.getMicroservice().getPaths().size());
+ } else {
+ TestMgr.check(22, RegistrationManager.INSTANCE.getMicroservice().getPaths().size());
}
}
diff --git a/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/RetrySchema.java b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/RetrySchema.java
new file mode 100644
index 0000000..0d6d7ae
--- /dev/null
+++ b/demo/demo-springmvc/springmvc-server/src/main/java/org/apache/servicecomb/demo/springmvc/server/RetrySchema.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.demo.springmvc.server;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.ws.rs.core.Response.Status;
+
+import org.apache.servicecomb.provider.rest.common.RestSchema;
+import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+
+// test cases for retry
+@RestSchema(schemaId = "RetrySchema")
+@RequestMapping(path = "/retry", produces = MediaType.APPLICATION_JSON_VALUE)
+public class RetrySchema {
+ private AtomicLong counter = new AtomicLong(0);
+
+ // 测试基于流量特征治理
+ @GetMapping(path = "/governance/successWhenRetry")
+ public boolean successWhenRetry() {
+ if (counter.getAndIncrement() % 3 != 0) {
+ throw new InvocationException(Status.INTERNAL_SERVER_ERROR, "try again later.");
+ }
+ return true;
+ }
+
+ @GetMapping(path = "/governance/successWhenRetryAsync")
+ public CompletableFuture<Boolean> successWhenRetryAsync() {
+ CompletableFuture<Boolean> result = new CompletableFuture<>();
+ if (counter.getAndIncrement() % 2 == 0) {
+ result.completeExceptionally(new InvocationException(Status.INTERNAL_SERVER_ERROR, "try again later."));
+ } else {
+ result.complete(true);
+ }
+ return result;
+ }
+}
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
index e52e4d2..f5cfbf7 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ConsumerGovernanceHandler.java
@@ -17,125 +17,14 @@
package org.apache.servicecomb.handler.governance;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-
import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
-import org.apache.servicecomb.core.provider.consumer.SyncResponseExecutor;
-import org.apache.servicecomb.foundation.common.utils.BeanUtils;
-import org.apache.servicecomb.governance.handler.RetryHandler;
-import org.apache.servicecomb.governance.marker.GovernanceRequest;
import org.apache.servicecomb.swagger.invocation.AsyncResponse;
-import org.apache.servicecomb.swagger.invocation.Response;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.github.resilience4j.decorators.Decorators;
-import io.github.resilience4j.decorators.Decorators.DecorateCompletionStage;
-import io.github.resilience4j.retry.Retry;
public class ConsumerGovernanceHandler implements Handler {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerGovernanceHandler.class);
-
- private RetryHandler retryHandler = BeanUtils.getBean(RetryHandler.class);
-
- private static final ScheduledExecutorService RETRY_POOL = Executors.newScheduledThreadPool(2, new ThreadFactory() {
- private AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, "governance-retry-pool-thread-" + count.getAndIncrement());
- // avoid block shutdown
- thread.setDaemon(true);
- return thread;
- }
- });
-
+ // an empty implementation, will add possible features in future.
@Override
public void handle(Invocation invocation, AsyncResponse asyncResp) throws Exception {
- Supplier<CompletionStage<Response>> next = createBusinessCompletionStageSupplier(invocation);
- DecorateCompletionStage<Response> dcs = Decorators.ofCompletionStage(next);
- GovernanceRequest request = MatchType.createGovHttpRequest(invocation);
-
- try {
- ServiceCombInvocationContext.setInvocationContext(invocation);
- addRetry(dcs, request);
- } finally {
- ServiceCombInvocationContext.removeInvocationContext();
- }
-
- final SyncResponseExecutor originalExecutor;
- final Executor newExecutor;
- if (invocation.getResponseExecutor() instanceof SyncResponseExecutor) {
- originalExecutor = (SyncResponseExecutor) invocation.getResponseExecutor();
- newExecutor = command -> {
- // retry的场景,对于同步调用, 同步调用的主线程已经被挂起,无法再主线程中进行重试;
- // 重试的场景,主线程等待响应线程唤醒。因此需要转换主线程,响应唤醒新的主线程,在重试逻辑成功后,再唤醒原来的主线程。
- // 重试也不能在网络线程(event-loop)中进行,未被保护的阻塞操作会导致网络线程挂起
- RETRY_POOL.submit(command);
- };
- invocation.setResponseExecutor(newExecutor);
- } else {
- originalExecutor = null;
- newExecutor = null;
- }
-
- dcs.get().whenComplete((r, e) -> {
- if (e == null) {
- if (originalExecutor != null) {
- originalExecutor.execute(() -> {
- asyncResp.complete(r);
- });
- } else {
- asyncResp.complete(r);
- }
- return;
- }
-
- if (originalExecutor != null) {
- originalExecutor.execute(() -> {
- asyncResp.consumerFail(e);
- });
- } else {
- asyncResp.consumerFail(e);
- }
- });
- }
-
- private void addRetry(DecorateCompletionStage<Response> dcs, GovernanceRequest request) {
- Retry retry = retryHandler.getActuator(request);
- if (retry != null) {
- dcs.withRetry(retry, RETRY_POOL);
- }
- }
-
- private Supplier<CompletionStage<Response>> createBusinessCompletionStageSupplier(Invocation invocation) {
- final int currentHandler = invocation.getHandlerIndex();
- final AtomicBoolean isRetryHolder = new AtomicBoolean(false);
-
- return () -> {
- CompletableFuture<Response> result = new CompletableFuture<>();
- if (isRetryHolder.getAndSet(true)) {
- invocation.setHandlerIndex(currentHandler);
- LOGGER.info("retry operation {}, trace id {}",
- invocation.getOperationMeta().getMicroserviceQualifiedName(), invocation.getTraceId());
- }
- try {
- invocation.next(response -> {
- result.complete(response);
- });
- } catch (Exception e) {
- result.completeExceptionally(e);
- }
- return result;
- };
+ invocation.next(asyncResp);
}
}
diff --git a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
index e31cfd2..e539c23 100644
--- a/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
+++ b/handlers/handler-governance/src/main/java/org/apache/servicecomb/handler/governance/ProviderGovernanceHandler.java
@@ -23,6 +23,8 @@
import org.apache.servicecomb.core.Handler;
import org.apache.servicecomb.core.Invocation;
+import org.apache.servicecomb.core.governance.MatchType;
+import org.apache.servicecomb.core.governance.ServiceCombInvocationContext;
import org.apache.servicecomb.foundation.common.utils.BeanUtils;
import org.apache.servicecomb.governance.handler.BulkheadHandler;
import org.apache.servicecomb.governance.handler.CircuitBreakerHandler;
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
index a31895a..d4ea2e6 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/Configuration.java
@@ -43,12 +43,6 @@
// retry configurations
public static final String RETRY_HANDLER = "retryHandler";
- public static final String RETRY_ENABLED = "retryEnabled";
-
- public static final String RETRY_ON_NEXT = "retryOnNext";
-
- public static final String RETRY_ON_SAME = "retryOnSame";
-
// SessionStickinessRule configruation
public static final String SESSION_TIMEOUT_IN_SECONDS = "SessionStickinessRule.sessionTimeoutInSeconds";
@@ -112,43 +106,6 @@
}
}
- public String getRetryHandler(String microservice) {
- return getStringProperty("default",
- ROOT + microservice + "." + RETRY_HANDLER,
- ROOT + RETRY_HANDLER);
- }
-
- public boolean isRetryEnabled(String microservice) {
- String p = getStringProperty("false",
- ROOT + microservice + "." + RETRY_ENABLED,
- ROOT + RETRY_ENABLED);
- return Boolean.parseBoolean(p);
- }
-
- public int getRetryNextServer(String microservice) {
- return getRetryServer(microservice, RETRY_ON_NEXT);
- }
-
- public int getRetrySameServer(String microservice) {
- return getRetryServer(microservice, RETRY_ON_SAME);
- }
-
- private int getRetryServer(String microservice, String retryType) {
- final int defaultValue = 0;
- String p = getStringProperty("0",
- ROOT + microservice + "." + retryType,
- ROOT + retryType);
- try {
- int result = Integer.parseInt(p);
- if (result > 0) {
- return result;
- }
- return defaultValue;
- } catch (NumberFormatException e) {
- return defaultValue;
- }
- }
-
public boolean isIsolationFilterOpen(String microservice) {
String p = getStringProperty("true",
ROOT + microservice + "." + FILTER_ISOLATION + FILTER_OPEN,
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java
deleted file mode 100644
index dcb01b2..0000000
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/DefaultRetryExtensionsFactory.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicecomb.loadbalance;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.NoRouteToHostException;
-import java.net.SocketTimeoutException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
-import org.springframework.stereotype.Component;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.netflix.client.DefaultLoadBalancerRetryHandler;
-import com.netflix.client.RetryHandler;
-
-import io.vertx.core.VertxException;
-
-@Component
-public class DefaultRetryExtensionsFactory implements ExtensionsFactory {
- private static final Collection<String> ACCEPT_KEYS = Lists.newArrayList(
- Configuration.RETRY_HANDLER);
-
- private static final String RETRY_DEFAULT = "default";
-
- private static final Collection<String> ACCEPT_VALUES = Lists.newArrayList(
- RETRY_DEFAULT);
-
- private static final Map<Class<? extends Throwable>, List<String>> strictRetriable =
- ImmutableMap.<Class<? extends Throwable>, List<String>>builder()
- .put(ConnectException.class, Arrays.asList())
- .put(SocketTimeoutException.class, Arrays.asList())
- /*
- * deal with some special exceptions caused by the server side close the connection
- */
- .put(IOException.class, Arrays.asList(new String[] {"Connection reset by peer"}))
- .put(VertxException.class, Arrays.asList(new String[] {"Connection was closed"}))
- .put(NoRouteToHostException.class, Arrays.asList())
- .build();
-
- @Override
- public boolean isSupport(String key, String value) {
- return ACCEPT_KEYS.contains(key) && ACCEPT_VALUES.contains(value);
- }
-
- public RetryHandler createRetryHandler(String retryName, String microservice) {
- return new DefaultLoadBalancerRetryHandler(
- Configuration.INSTANCE.getRetrySameServer(microservice),
- Configuration.INSTANCE.getRetryNextServer(microservice), true) {
-
- @Override
- public boolean isRetriableException(Throwable e, boolean sameServer) {
- boolean retryable = isPresentAsCause(e);
- if (!retryable) {
- if (e instanceof InvocationException) {
- if (((InvocationException) e).getStatusCode() == 503) {
- return true;
- }
- }
- }
- return retryable;
- }
-
- public boolean isPresentAsCause(Throwable throwableToSearchIn) {
- int infiniteLoopPreventionCounter = 10;
- while (throwableToSearchIn != null && infiniteLoopPreventionCounter > 0) {
- infiniteLoopPreventionCounter--;
- for (Entry<Class<? extends Throwable>, List<String>> c : strictRetriable.entrySet()) {
- Class<? extends Throwable> key = c.getKey();
- if (key.isAssignableFrom(throwableToSearchIn.getClass())) {
- if (c.getValue() == null || c.getValue().isEmpty()) {
- return true;
- } else {
- String msg = throwableToSearchIn.getMessage();
- for (String val : c.getValue()) {
- if (val.equals(msg)) {
- return true;
- }
- }
- }
- }
- }
- throwableToSearchIn = throwableToSearchIn.getCause();
- }
- return false;
- }
- };
- }
-}
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
index ce3dca4..342bb8c 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsFactory.java
@@ -16,8 +16,6 @@
*/
package org.apache.servicecomb.loadbalance;
-import com.netflix.client.RetryHandler;
-
/**
* By implements ExtensionsFactory, users can add new extends for rules, filters, etc.
*/
@@ -31,8 +29,4 @@
default ServerListFilterExt createServerListFilter(String key, String value, Object... args) {
return null;
}
-
- default RetryHandler createRetryHandler(String retryName, String microservice) {
- return null;
- }
}
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
index ff91963..bb07c18 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/ExtensionsManager.java
@@ -22,8 +22,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.netflix.client.RetryHandler;
-
public class ExtensionsManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ExtensionsManager.class);
@@ -52,18 +50,4 @@
LOGGER.info("Using load balance rule {} for microservice {}.", rule.getClass().getName(), microservice);
return rule;
}
-
- public static RetryHandler createRetryHandler(String microservice) {
- RetryHandler handler = null;
- for (ExtensionsFactory factory : extentionFactories) {
- if (factory.isSupport(Configuration.RETRY_HANDLER, Configuration.INSTANCE.getRetryHandler(microservice))) {
- handler = factory.createRetryHandler(Configuration.INSTANCE.getRetryHandler(microservice), microservice);
- break;
- }
- }
-
- // handler can not be null. handler will be created for each invocation.
- LOGGER.debug("Using retry handler {} for microservice {}.", handler.getClass().getName(), microservice);
- return handler;
- }
}
diff --git a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
index 3576e3b..dc8944f 100644
--- a/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
+++ b/handlers/handler-loadbalance/src/main/java/org/apache/servicecomb/loadbalance/LoadbalanceHandler.java
@@ -18,15 +18,8 @@
package org.apache.servicecomb.loadbalance;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.core.Response.Status;
@@ -36,10 +29,9 @@
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.SCBEngine;
import org.apache.servicecomb.core.Transport;
-import org.apache.servicecomb.core.provider.consumer.SyncResponseExecutor;
+import org.apache.servicecomb.core.governance.RetryContext;
import org.apache.servicecomb.foundation.common.cache.VersionedCache;
import org.apache.servicecomb.foundation.common.concurrent.ConcurrentHashMapEx;
-import org.apache.servicecomb.foundation.common.utils.ExceptionUtils;
import org.apache.servicecomb.loadbalance.filter.ServerDiscoveryFilter;
import org.apache.servicecomb.registry.discovery.DiscoveryContext;
import org.apache.servicecomb.registry.discovery.DiscoveryFilter;
@@ -52,20 +44,16 @@
import org.slf4j.LoggerFactory;
import com.netflix.config.DynamicPropertyFactory;
-import com.netflix.loadbalancer.ILoadBalancer;
-import com.netflix.loadbalancer.Server;
-import com.netflix.loadbalancer.reactive.ExecutionContext;
-import com.netflix.loadbalancer.reactive.ExecutionInfo;
-import com.netflix.loadbalancer.reactive.ExecutionListener;
-import com.netflix.loadbalancer.reactive.LoadBalancerCommand;
-import com.netflix.loadbalancer.reactive.ServerOperation;
-
-import rx.Observable;
/**
* Load balance handler.
*/
public class LoadbalanceHandler implements Handler {
+ public static final String CONTEXT_KEY_LAST_SERVER = "x-context-last-server";
+
+ // Enough times to make sure to choose a different server in high volume.
+ private static final int COUNT = 17;
+
public static final String CONTEXT_KEY_SERVER_LIST = "x-context-server-list";
public static final String SERVICECOMB_SERVER_ENDPOINT = "scb-endpoint";
@@ -76,85 +64,12 @@
DynamicPropertyFactory.getInstance()
.getBooleanProperty("servicecomb.loadbalance.userDefinedEndpoint.enabled", false).get();
- // just a wrapper to make sure in retry mode to choose a different server.
- class RetryLoadBalancer implements ILoadBalancer {
- // Enough times to make sure to choose a different server in high volume.
- static final int COUNT = 17;
-
- Server lastServer = null;
-
- LoadBalancer delegate;
-
- RetryLoadBalancer(LoadBalancer delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public void addServers(List<Server> newServers) {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public Server chooseServer(Object key) {
- Invocation invocation = (Invocation) key;
- boolean isRetry = null != lastServer;
- for (int i = 0; i < COUNT; i++) {
- Server s = delegate.chooseServer(invocation);
- if (s == null) {
- break;
- }
- if (!s.equals(lastServer)) {
- lastServer = s;
- break;
- }
- }
- if (isRetry) {
- invocation.getTraceIdLogger().info(LOGGER, "retry to instance [{}]", lastServer.getHostPort());
- }
-
- return lastServer;
- }
-
- @Override
- public void markServerDown(Server server) {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- @Deprecated
- public List<Server> getServerList(boolean availableOnly) {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public List<Server> getReachableServers() {
- throw new UnsupportedOperationException("Not implemented.");
- }
-
- @Override
- public List<Server> getAllServers() {
- throw new UnsupportedOperationException("Not implemented.");
- }
- }
-
private static final Logger LOGGER = LoggerFactory.getLogger(LoadbalanceHandler.class);
- private static final ExecutorService RETRY_POOL = Executors.newCachedThreadPool(new ThreadFactory() {
- private AtomicInteger count = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r, "retry-pool-thread-" + count.getAndIncrement());
- // avoid block shutdown
- thread.setDaemon(true);
- return thread;
- }
- });
-
private DiscoveryTree discoveryTree = new DiscoveryTree();
// key为grouping filter qualified name
- private volatile Map<String, LoadBalancer> loadBalancerMap = new ConcurrentHashMapEx<>();
+ private final Map<String, LoadBalancer> loadBalancerMap = new ConcurrentHashMapEx<>();
private final Object lock = new Object();
@@ -214,11 +129,7 @@
LoadBalancer loadBalancer = getOrCreateLoadBalancer(invocation);
- if (!Configuration.INSTANCE.isRetryEnabled(invocation.getMicroserviceName())) {
- send(invocation, asyncResp, loadBalancer);
- } else {
- sendWithRetry(invocation, asyncResp, loadBalancer);
- }
+ send(invocation, asyncResp, loadBalancer);
}
// user's can invoke a service by supplying target Endpoint.
@@ -259,9 +170,7 @@
}
invocation.setEndpoint((Endpoint) endpoint);
- invocation.next(resp -> {
- asyncResp.handle(resp);
- });
+ invocation.next(asyncResp);
return true;
}
@@ -271,7 +180,7 @@
private void send(Invocation invocation, AsyncResponse asyncResp, LoadBalancer chosenLB) throws Exception {
long time = System.currentTimeMillis();
- ServiceCombServer server = chosenLB.chooseServer(invocation);
+ ServiceCombServer server = chooseServer(invocation, chosenLB);
if (null == server) {
asyncResp.consumerFail(new InvocationException(Status.INTERNAL_SERVER_ERROR, "No available address found."));
return;
@@ -289,158 +198,52 @@
chosenLB.getLoadBalancerStats().incrementActiveRequestsCount(server);
ServiceCombLoadBalancerStats.INSTANCE.markSuccess(server);
}
+ // clear endpoint after invocation finished. In retry, will choose a new server, this is different than
+ // user defined endpoint
+ invocation.setEndpoint(null);
asyncResp.handle(resp);
});
}
- private void sendWithRetry(Invocation invocation, AsyncResponse asyncResp,
- LoadBalancer chosenLB) throws Exception {
- long time = System.currentTimeMillis();
- // retry in loadbalance, 2.0 feature
- int currentHandler = invocation.getHandlerIndex();
-
- SyncResponseExecutor originalExecutor;
- Executor newExecutor;
- if (invocation.getResponseExecutor() instanceof SyncResponseExecutor) {
- originalExecutor = (SyncResponseExecutor) invocation.getResponseExecutor();
- newExecutor = new Executor() {
- @Override
- public void execute(Runnable command) {
- // retry的场景,对于同步调用, 同步调用的主线程已经被挂起,无法再主线程中进行重试;
- // 重试也不能在网络线程(event-loop)中进行,未被保护的阻塞操作会导致网络线程挂起
- RETRY_POOL.submit(command);
- }
- };
- invocation.setResponseExecutor(newExecutor);
- } else {
- originalExecutor = null;
- newExecutor = null;
+ private ServiceCombServer chooseServer(Invocation invocation, LoadBalancer chosenLB) {
+ RetryContext retryContext = invocation.getLocalContext(RetryContext.RETRY_CONTEXT);
+ if (retryContext == null) {
+ return chosenLB.chooseServer(invocation);
}
- ExecutionListener<Invocation, Response> listener = new ExecutionListener<Invocation, Response>() {
- @Override
- public void onExecutionStart(ExecutionContext<Invocation> context) throws AbortExecutionException {
- }
+ if (!retryContext.isRetry()) {
+ ServiceCombServer server = chosenLB.chooseServer(invocation);
+ invocation.addLocalContext(CONTEXT_KEY_LAST_SERVER, server);
+ return server;
+ }
- @Override
- public void onStartWithServer(ExecutionContext<Invocation> context,
- ExecutionInfo info) throws AbortExecutionException {
- }
-
- @Override
- public void onExceptionWithServer(ExecutionContext<Invocation> context, Throwable exception,
- ExecutionInfo info) {
- context.getRequest().getTraceIdLogger()
- .error(LOGGER, "Invoke server failed. Operation {}; server {}; {}-{} msg {}",
- context.getRequest().getInvocationQualifiedName(),
- context.getRequest().getEndpoint(),
- info.getNumberOfPastServersAttempted(),
- info.getNumberOfPastAttemptsOnServer(),
- ExceptionUtils.getExceptionMessageWithoutTrace(exception));
- }
-
- @Override
- public void onExecutionSuccess(ExecutionContext<Invocation> context, Response response,
- ExecutionInfo info) {
- if (info.getNumberOfPastServersAttempted() > 0 || info.getNumberOfPastAttemptsOnServer() > 0) {
- context.getRequest().getTraceIdLogger().error(LOGGER, "Invoke server success. Operation {}; server {}",
- context.getRequest().getInvocationQualifiedName(),
- context.getRequest().getEndpoint());
+ ServiceCombServer lastServer = invocation.getLocalContext(CONTEXT_KEY_LAST_SERVER);
+ ServiceCombServer nextServer = lastServer;
+ if (!retryContext.trySameServer()) {
+ for (int i = 0; i < COUNT; i++) {
+ ServiceCombServer s = chosenLB.chooseServer(invocation);
+ if (s == null) {
+ break;
}
- if (originalExecutor != null) {
- originalExecutor.execute(() -> {
- asyncResp.complete(response);
- });
- } else {
- asyncResp.complete(response);
+ if (!s.equals(nextServer)) {
+ nextServer = s;
+ break;
}
}
+ }
- @Override
- public void onExecutionFailed(ExecutionContext<Invocation> context, Throwable finalException,
- ExecutionInfo info) {
- context.getRequest().getTraceIdLogger().error(LOGGER, "Invoke all server failed. Operation {}, e={}",
- context.getRequest().getInvocationQualifiedName(),
- ExceptionUtils.getExceptionMessageWithoutTrace(finalException));
- if (originalExecutor != null) {
- originalExecutor.execute(() -> {
- fail(finalException);
- });
- } else {
- fail(finalException);
- }
- }
-
- private void fail(Throwable finalException) {
- int depth = 10;
- Throwable t = finalException;
- while (depth-- > 0) {
- if (t instanceof InvocationException) {
- asyncResp.consumerFail(t);
- return;
- }
- t = finalException.getCause();
- }
- asyncResp.consumerFail(finalException);
- }
- };
- List<ExecutionListener<Invocation, Response>> listeners = new ArrayList<>(0);
- listeners.add(listener);
- ExecutionContext<Invocation> context = new ExecutionContext<>(invocation, null, null, null);
-
- LoadBalancerCommand<Response> command = LoadBalancerCommand.<Response>builder()
- .withLoadBalancer(new RetryLoadBalancer(chosenLB))
- .withServerLocator(invocation)
- .withRetryHandler(ExtensionsManager.createRetryHandler(invocation.getMicroserviceName()))
- .withListeners(listeners)
- .withExecutionContext(context)
- .build();
-
- Observable<Response> observable = command.submit(new ServerOperation<Response>() {
- public Observable<Response> call(Server s) {
- return Observable.create(f -> {
- try {
- ServiceCombServer server = (ServiceCombServer) s;
- chosenLB.getLoadBalancerStats().incrementNumRequests(s);
- invocation.setHandlerIndex(currentHandler); // for retry
- invocation.setEndpoint(server.getEndpoint());
- invocation.next(resp -> {
- if (isFailedResponse(resp)) {
- invocation.getTraceIdLogger().error(LOGGER, "service {}, call error, msg is {}, server is {} ",
- invocation.getInvocationQualifiedName(),
- ExceptionUtils.getExceptionMessageWithoutTrace((Throwable) resp.getResult()),
- s);
- chosenLB.getLoadBalancerStats().incrementSuccessiveConnectionFailureCount(s);
- ServiceCombLoadBalancerStats.INSTANCE.markFailure(server);
- f.onError(resp.getResult());
- } else {
- chosenLB.getLoadBalancerStats().incrementActiveRequestsCount(s);
- chosenLB.getLoadBalancerStats().noteResponseTime(s,
- (System.currentTimeMillis() - time));
- ServiceCombLoadBalancerStats.INSTANCE.markSuccess(server);
- f.onNext(resp);
- f.onCompleted();
- }
- });
- } catch (Exception e) {
- invocation.getTraceIdLogger()
- .error(LOGGER, "execution error, msg is {}", ExceptionUtils.getExceptionMessageWithoutTrace(e));
- f.onError(e);
- }
- });
- }
- });
-
- observable.subscribe(response -> {
- }, error -> {
- }, () -> {
- });
+ LOGGER.info("retry to instance [{}], last instance [{}], trace id {}",
+ nextServer.getHostPort(),
+ lastServer.getHostPort(),
+ invocation.getTraceId());
+ invocation.addLocalContext(CONTEXT_KEY_LAST_SERVER, lastServer);
+ return lastServer;
}
protected boolean isFailedResponse(Response resp) {
if (resp.isFailed()) {
- if (InvocationException.class.isInstance(resp.getResult())) {
- InvocationException e = (InvocationException) resp.getResult();
+ if (resp.getResult() instanceof InvocationException) {
+ InvocationException e = resp.getResult();
return e.getStatusCode() == ExceptionFactory.CONSUMER_INNER_STATUS_CODE
|| e.getStatusCode() == Status.SERVICE_UNAVAILABLE.getStatusCode()
|| e.getStatusCode() == Status.REQUEST_TIMEOUT.getStatusCode();
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestConfiguration.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestConfiguration.java
index 2dfaa52..1c88154 100644
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestConfiguration.java
+++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestConfiguration.java
@@ -28,9 +28,6 @@
import com.netflix.config.ConcurrentCompositeConfiguration;
-import mockit.Mock;
-import mockit.MockUp;
-
/**
*
*/
@@ -45,9 +42,6 @@
assertEquals("servicecomb.loadbalance.", Configuration.ROOT);
assertEquals("ribbon.", Configuration.ROOT_20);
- assertEquals("retryEnabled", Configuration.RETRY_ENABLED);
- assertEquals("retryOnNext", Configuration.RETRY_ON_NEXT);
- assertEquals("retryOnSame", Configuration.RETRY_ON_SAME);
assertEquals("SessionStickinessRule.successiveFailedTimes", Configuration.SUCCESSIVE_FAILED_TIMES);
assertEquals("maxSingleTestWindow", Configuration.FILTER_MAX_SINGLE_TEST_WINDOW);
@@ -56,103 +50,11 @@
@Test
public void testFullConfigurationWithArgsString() {
- assertNotNull(Configuration.INSTANCE.getRetryNextServer("test"));
- assertNotNull(Configuration.INSTANCE.getRetrySameServer("test"));
- assertNotNull(Configuration.INSTANCE.isRetryEnabled("test"));
assertNotNull(Configuration.INSTANCE.getSuccessiveFailedTimes("test"));
assertNotNull(Configuration.INSTANCE.getSessionTimeoutInSeconds("test"));
}
@Test
- public void testConfigurationWithGetpropertyReturnsStringChar() {
-
- new MockUp<Configuration>() {
- @Mock
- private String getProperty(String defaultValue, String... keys) {
- return "tyt";
- }
- };
-
- Configuration.INSTANCE.getRetryNextServer("test");
-
- assertNotNull(Configuration.INSTANCE.getRetryNextServer("test"));
- }
-
- @Test
- public void testConfigurationWithGetpropertyReturnsStringNum() {
-
- new MockUp<Configuration>() {
-
- @Mock
- private String getProperty(String defaultValue, String... keys) {
- return "1234";
- }
- };
-
- Configuration.INSTANCE.getRetryNextServer("test");
-
- assertNotNull(Configuration.INSTANCE.getRetryNextServer("test"));
- }
-
- @Test
- public void testGetRetryOnSameWithGetpropertyReturnsStringChar() {
-
- new MockUp<Configuration>() {
- @Mock
- private String getProperty(String defaultValue, String... keys) {
- return "tyt";
- }
- };
-
- Configuration.INSTANCE.getRetrySameServer("test");
- assertNotNull(Configuration.INSTANCE.getRetrySameServer("test"));
- }
-
- @Test
- public void testGetRetryOnSameWithGetpropertyReturnsStringNum() {
-
- new MockUp<Configuration>() {
-
- @Mock
- private String getProperty(String defaultValue, String... keys) {
- return "1234";
- }
- };
-
- Configuration.INSTANCE.getRetrySameServer("test");
- assertNotNull(Configuration.INSTANCE.getRetrySameServer("test"));
- }
-
- @Test
- public void testIsRetryEnabledWithGetpropertyReturnsStringChar() {
-
- new MockUp<Configuration>() {
- @Mock
- private String getProperty(String defaultValue, String... keys) {
- return "tyt";
- }
- };
-
- Configuration.INSTANCE.isRetryEnabled("test");
- assertNotNull(Configuration.INSTANCE.isRetryEnabled("test"));
- }
-
- @Test
- public void testIsRetryEnabledWithGetpropertyReturnsStringNum() {
-
- new MockUp<Configuration>() {
-
- @Mock
- private String getProperty(String defaultValue, String... keys) {
- return "1234";
- }
- };
-
- Configuration.INSTANCE.isRetryEnabled("test");
- assertNotNull(Configuration.INSTANCE.isRetryEnabled("test"));
- }
-
- @Test
public void testGetSuccessiveFailedTimes() {
assertNotNull(Configuration.INSTANCE.getSuccessiveFailedTimes("test"));
}
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestDefaultRetryhandler.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestDefaultRetryhandler.java
deleted file mode 100644
index b0bd4ab..0000000
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestDefaultRetryhandler.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.servicecomb.loadbalance;
-
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.NoRouteToHostException;
-import java.net.SocketTimeoutException;
-
-import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.netflix.client.RetryHandler;
-
-import io.vertx.core.VertxException;
-
-public class TestDefaultRetryhandler {
-
- private static final String RETYR_NAME = "default";
-
- private static final String MICROSERVICE_NAME = "servicename";
-
- private RetryHandler retryHandler;
-
- @Before
- public void setup() {
- DefaultRetryExtensionsFactory factory = new DefaultRetryExtensionsFactory();
- retryHandler = factory.createRetryHandler(RETYR_NAME, MICROSERVICE_NAME);
- }
-
- @Test
- public void testRetryWithConnectionException() {
- Exception target = new ConnectException("connectin refused");
- Exception root = new Exception(target);
- boolean retriable = retryHandler.isRetriableException(root, false);
- Assert.assertTrue(retriable);
- }
-
- @Test
- public void testRetryWithSocketTimeout() {
- Exception target = new SocketTimeoutException("Read timed out");
- Exception root = new Exception(target);
- boolean retriable = retryHandler.isRetriableException(root, false);
- Assert.assertTrue(retriable);
- }
-
- @Test
- public void testRetryWithIOException() {
- Exception target = new IOException("Connection reset by peer");
- Exception root = new Exception(target);
- boolean retriable = retryHandler.isRetriableException(root, false);
- Assert.assertTrue(retriable);
-
- target = new IOException("Target not exist");
- root = new Exception(target);
- retriable = retryHandler.isRetriableException(root, false);
- Assert.assertFalse(retriable);
- }
-
- @Test
- public void testRetryVertxException() {
- Exception target = new VertxException("Connection was closed");
- Exception root = new Exception(target);
- boolean retriable = retryHandler.isRetriableException(root, false);
- Assert.assertTrue(retriable);
-
- target = new IOException("");
- root = new Exception(target);
- retriable = retryHandler.isRetriableException(root, false);
- Assert.assertFalse(retriable);
- }
-
- @Test
- public void testRetryNoRouteToHostException() {
- Exception target = new NoRouteToHostException("Host is unreachable");
- Exception root = new Exception(target);
- boolean retriable = retryHandler.isRetriableException(root, false);
- Assert.assertTrue(retriable);
-
- target = new NoRouteToHostException("No route to host");
- root = new Exception(target);
- retriable = retryHandler.isRetriableException(root, false);
- Assert.assertTrue(retriable);
- }
-
- @Test
- public void testRetryInvocation503() {
- Exception root = new InvocationException(503, "Service Unavailable", "Error");
- boolean retriable = retryHandler.isRetriableException(root, false);
- Assert.assertTrue(retriable);
- }
-
- @Test
- public void testRetryEqualTen() {
- Exception target = new ConnectException("connectin refused");
- for (int i = 0; i < 8; i++) {
- target = new Exception("Level" + i, target);
- }
- Exception root = new Exception(target);
- boolean retriable = retryHandler.isRetriableException(root, false);
- Assert.assertTrue(retriable);
- }
-
- @Test
- public void testRetryOverTen() {
- Exception target = new ConnectException("connectin refused");
- for (int i = 0; i < 9; i++) {
- target = new Exception("Level" + i, target);
- }
- Exception root = new Exception(target);
- boolean retriable = retryHandler.isRetriableException(root, false);
- Assert.assertFalse(retriable);
- }
-}
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
index 3b1faac..f2b6788 100644
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
+++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestExtensionsManager.java
@@ -16,23 +16,16 @@
*/
package org.apache.servicecomb.loadbalance;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import org.apache.servicecomb.config.ConfigUtil;
import org.apache.servicecomb.foundation.test.scaffolding.config.ArchaiusUtils;
-import org.apache.servicecomb.swagger.invocation.exception.InvocationException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.netflix.client.DefaultLoadBalancerRetryHandler;
-import com.netflix.client.RetryHandler;
-
import mockit.Deencapsulation;
public class TestExtensionsManager {
@@ -58,7 +51,6 @@
BeansHolder holder = new BeansHolder();
List<ExtensionsFactory> extensionsFactories = new ArrayList<>();
extensionsFactories.add(new RuleNameExtentionsFactory());
- extensionsFactories.add(new DefaultRetryExtensionsFactory());
Deencapsulation.setField(holder, "extentionsFactories", extensionsFactories);
holder.init();
@@ -83,18 +75,7 @@
BeansHolder holder = new BeansHolder();
List<ExtensionsFactory> extensionsFactories = new ArrayList<>();
extensionsFactories.add(new RuleNameExtentionsFactory());
- extensionsFactories.add(new DefaultRetryExtensionsFactory());
Deencapsulation.setField(holder, "extentionsFactories", extensionsFactories);
holder.init();
- RetryHandler retryHandler = ExtensionsManager.createRetryHandler("mytest1");
- Assert.assertTrue(DefaultLoadBalancerRetryHandler.class.isInstance(retryHandler));
- Assert.assertFalse(retryHandler.isRetriableException(new InvocationException(400, "", ""), false));
- Assert.assertFalse(retryHandler.isRetriableException(new InvocationException(400, "", ""), true));
- Assert.assertTrue(retryHandler.isRetriableException(new InvocationException(503, "", ""), true));
- Assert.assertTrue(retryHandler.isRetriableException(new ConnectException(), false));
- Assert.assertTrue(retryHandler.isRetriableException(new ConnectException(), true));
- Assert.assertTrue(retryHandler.isRetriableException(new SocketTimeoutException(), false));
- Assert.assertTrue(retryHandler.isRetriableException(new SocketTimeoutException(), true));
- Assert.assertFalse(retryHandler.isRetriableException(new IOException(), true));
}
}
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadBalanceHandler2.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadBalanceHandler2.java
index 7d0938a..e68d8a1 100644
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadBalanceHandler2.java
+++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadBalanceHandler2.java
@@ -880,65 +880,6 @@
}
/**
- * Two available instances, first time the normal instance is selected and failed. Then retry to the TRYING status
- * instance. In the whole procedure, the TRYING status instance should keep the TRYING status.
- */
- @Test
- public void first_normal_instance_then_trying_instance() {
- ExtensionsManager.addExtentionsFactory(new DefaultRetryExtensionsFactory());
- ArchaiusUtils.setProperty("servicecomb.loadbalance.retryEnabled", true);
- ArchaiusUtils.setProperty("servicecomb.loadbalance.retryOnNext", 1);
-
- ArrayList<ServiceCombServer> servers = new ArrayList<>();
- ServiceCombServer server0 = createMockedServer("instanceId0", "rest://127.0.0.1:8080");
- ServiceCombServer server1 = createMockedServer("instanceId1", "rest://127.0.0.1:8081");
- servers.add(server0);
- servers.add(server1);
-
- ServiceCombServerStats stats0 = mockServiceCombServerStats(server0, 0, false);
- ServiceCombServerStats stats1 = mockServiceCombServerStats(server1, 5, true);
-
- DiscoveryTree discoveryTree = createMockedDiscoveryTree(servers);
- LoadbalanceHandler handler = new LoadbalanceHandler(discoveryTree);
-
- Holder<Integer> counter = new Holder<>(0);
- Invocation invocation = new NonSwaggerInvocation("testApp", "testMicroserviceName", "0.0.0+",
- (inv, aysnc) -> {
- Assert.assertFalse(stats0.isIsolated());
- Assert.assertTrue(stats1.isIsolated());
- Assert.assertEquals(5, stats1.getContinuousFailureCount());
- Assert.assertFalse(ServiceCombServerStats.isolatedServerCanTry());
- if (counter.value == 0) {
- Assert.assertEquals("rest://127.0.0.1:8080", inv.getEndpoint().getEndpoint());
- Assert.assertEquals(0, stats0.getContinuousFailureCount());
- counter.value++;
- aysnc.producerFail(new InvocationException(503, "RETRY", "retry to next instance"));
- } else if (counter.value == 1) {
- Assert.assertEquals("rest://127.0.0.1:8081", inv.getEndpoint().getEndpoint());
- Assert.assertEquals(1, stats0.getContinuousFailureCount());
- counter.value++;
- aysnc.success("OK");
- } else {
- aysnc.producerFail(new InvocationException(400, "UNEXPECTED", "Unexpected Counter Value"));
- }
- });
-
- Assert.assertTrue(ServiceCombServerStats.applyForTryingChance(invocation));
- invocation.addLocalContext(IsolationDiscoveryFilter.TRYING_INSTANCES_EXISTING, true);
- try {
- handler.handle(invocation, (response) -> Assert.assertEquals("OK", response.getResult()));
- } catch (Exception e) {
- Assert.fail("unexpected exception " + e.getMessage());
- }
- Assert.assertEquals("rest://127.0.0.1:8081", invocation.getEndpoint().getEndpoint());
- Assert.assertFalse(stats0.isIsolated());
- Assert.assertEquals(1, stats0.getContinuousFailureCount());
- Assert.assertTrue(stats1.isIsolated());
- Assert.assertEquals(0, stats1.getContinuousFailureCount());
- Assert.assertTrue(ServiceCombServerStats.isolatedServerCanTry());
- }
-
- /**
* Mock the statistics of the specified {@code serviceCombServer}, set the failureCount and status.
* @return the ServiceCombServerStats object corresponding to the param {@code serviceCombServer}
*/
diff --git a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadbalanceHandler.java b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadbalanceHandler.java
index 754c6dd..026fd94 100644
--- a/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadbalanceHandler.java
+++ b/handlers/handler-loadbalance/src/test/java/org/apache/servicecomb/loadbalance/TestLoadbalanceHandler.java
@@ -107,6 +107,11 @@
void next(AsyncResponse asyncResp) throws Exception {
asyncResp.handle(sendResponse);
}
+
+ @Mock
+ public <T> T getLocalContext(String key) {
+ return (T) null;
+ }
};
new MockUp<TransportManager>(transportManager) {
@@ -126,7 +131,6 @@
BeansHolder holder = new BeansHolder();
List<ExtensionsFactory> extensionsFactories = new ArrayList<>();
extensionsFactories.add(new RuleNameExtentionsFactory());
- extensionsFactories.add(new DefaultRetryExtensionsFactory());
Deencapsulation.setField(holder, "extentionsFactories", extensionsFactories);
holder.init();
@@ -244,34 +248,10 @@
}
@Test
- public void sendWithRetry(@Injectable LoadBalancer loadBalancer) {
- Holder<String> result = new Holder<>();
- Deencapsulation.invoke(handler, "sendWithRetry", invocation, (AsyncResponse) resp -> {
- result.value = resp.getResult();
- }, loadBalancer);
-
- // no exception
- }
-
- @Test
public void testIsFailedResponse() {
Assert.assertFalse(handler.isFailedResponse(Response.create(400, "", "")));
Assert.assertFalse(handler.isFailedResponse(Response.create(500, "", "")));
Assert.assertTrue(handler.isFailedResponse(Response.create(490, "", "")));
Assert.assertTrue(handler.isFailedResponse(Response.consumerFailResp(new NullPointerException())));
}
-
- @Test
- public void retryPoolDaemon() throws ExecutionException, InterruptedException {
- ExecutorService RETRY_POOL = Deencapsulation.getField(handler, "RETRY_POOL");
-
- Holder<Thread> nameHolder = new Holder<>();
-
- RETRY_POOL.submit(() -> {
- nameHolder.value = Thread.currentThread();
- }).get();
-
- Assert.assertThat(nameHolder.value.getName(), Matchers.startsWith("retry-pool-thread-"));
- Assert.assertTrue(nameHolder.value.isDaemon());
- }
}