adjusting circuit breaker and retry behavior, adding priority customization handling and adding bulkhead metrics
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
index a9ce10a..52cb343 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
@@ -52,12 +52,14 @@
final Class<?> returnType = context.getMethod().getReturnType();
if (CompletionStage.class.isAssignableFrom(returnType)) {
- final CompletableFuture future = new CompletableFuture<>();
+ final ExtendedCompletableFuture<Object> future = newCompletableFuture(context);
getExecutor(context).execute(() -> {
try {
+ future.before();
final Object proceed = context.proceed();
final CompletionStage<?> stage = CompletionStage.class.cast(proceed);
stage.handle((r, e) -> {
+ future.after();
if (e != null) {
future.completeExceptionally(e);
} else {
@@ -72,10 +74,11 @@
return future;
}
if (Future.class.isAssignableFrom(returnType)) {
- final FutureWrapper<Object> facade = new FutureWrapper<>();
+ final FutureWrapper<Object> facade = newFuture(context);
getExecutor(context).execute(() -> {
final Object proceed;
try {
+ facade.before();
proceed = context.proceed();
facade.setDelegate(Future.class.cast(proceed));
} catch (final Exception e) {
@@ -91,12 +94,34 @@
"Should be Future or CompletionStage.");
}
- private static class FutureWrapper<T> implements Future<T> {
+ protected FutureWrapper<Object> newFuture(final InvocationContext context) {
+ return new FutureWrapper<>();
+ }
+
+ protected ExtendedCompletableFuture<Object> newCompletableFuture(final InvocationContext context) {
+ return new ExtendedCompletableFuture<>();
+ }
+
+ public static class ExtendedCompletableFuture<T> extends CompletableFuture<T> {
+ public void before() {
+ // no-op
+ }
+
+ public void after() {
+ // no-op
+ }
+ }
+
+ public static class FutureWrapper<T> implements Future<T> {
private final AtomicReference<Future<T>> delegate = new AtomicReference<>();
private final AtomicReference<Consumer<Future<T>>> cancelled = new AtomicReference<>();
private final CountDownLatch latch = new CountDownLatch(1);
- private void setDelegate(final Future<T> delegate) {
+ public void before() {
+ // no-op
+ }
+
+ public void setDelegate(final Future<T> delegate) {
final Consumer<Future<T>> cancelledTask = cancelled.get();
if (cancelledTask != null) {
cancelledTask.accept(delegate);
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
index b7f5799..f2998b6 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
@@ -25,8 +25,10 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
import javax.annotation.Priority;
@@ -39,18 +41,16 @@
import org.apache.safeguard.impl.annotation.AnnotationFinder;
import org.apache.safeguard.impl.asynchronous.BaseAsynchronousInterceptor;
import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
+import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.faulttolerance.exceptions.BulkheadException;
import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
-// todo: metrics
@Bulkhead
@Interceptor
@Priority(Interceptor.Priority.PLATFORM_AFTER + 5)
public class BulkheadInterceptor extends BaseAsynchronousInterceptor {
- private static final String EXECUTOR_KEY = BulkheadInterceptor.class.getName() + ".executor";
-
@Inject
private Cache cache;
@@ -66,25 +66,83 @@
}
}
if (model.useThreads) {
- context.getContextData().put(
- EXECUTOR_KEY + context.getContextData().get(IdGeneratorInterceptor.class.getName()), model.pool);
+ final Object id = context.getContextData().get(IdGeneratorInterceptor.class.getName());
+ context.getContextData().put(BulkheadInterceptor.class.getName() + ".model_" + id, model.pool);
+ context.getContextData().put(BulkheadInterceptor.class.getName() + "_" + id, model.pool);
return around(context);
} else {
if (!model.semaphore.tryAcquire()) {
+ model.callsRejected.inc();
throw new BulkheadException("No more permission available");
}
+ model.callsAccepted.inc();
+ model.concurrentCalls.incrementAndGet();
+ final long start = System.nanoTime();
try {
return context.proceed();
} finally {
+ model.executionDuration.update(System.nanoTime() - start);
model.semaphore.release();
+ model.concurrentCalls.decrementAndGet();
}
}
}
+ private Model getModel(final InvocationContext context) {
+ return Model.class.cast(context.getContextData().get(
+ BulkheadInterceptor.class.getName() + ".model_" +
+ context.getContextData().get(IdGeneratorInterceptor.class.getName())));
+ }
+
+ protected FutureWrapper<Object> newFuture(final InvocationContext context) {
+ return new ContextualFutureWrapper<>(getModel(context));
+ }
+
+ protected ExtendedCompletableFuture<Object> newCompletableFuture(final InvocationContext context) {
+ return new ContextualCompletableFuture<>(getModel(context));
+ }
+
@Override
protected Executor getExecutor(final InvocationContext context) {
return Executor.class.cast(context.getContextData()
- .get(EXECUTOR_KEY + context.getContextData().get(IdGeneratorInterceptor.class.getName())));
+ .get(BulkheadInterceptor.class.getName() + context.getContextData().get(IdGeneratorInterceptor.class.getName())));
+ }
+
+ private static class ContextualCompletableFuture<T> extends ExtendedCompletableFuture<T> {
+ private final Model model;
+
+ private ContextualCompletableFuture(final Model model) {
+ this.model = model;
+ }
+
+ @Override
+ public void before() {
+ model.concurrentCalls.incrementAndGet();
+ }
+
+ @Override
+ public void after() {
+ model.concurrentCalls.decrementAndGet();
+ }
+ }
+
+ private static class ContextualFutureWrapper<T> extends FutureWrapper<T> {
+ private final Model model;
+
+ private ContextualFutureWrapper(final Model model) {
+ this.model = model;
+ }
+
+ @Override
+ public void before() {
+ model.concurrentCalls.incrementAndGet();
+ }
+
+ @Override
+ public void setDelegate(final Future<T> delegate) {
+ model.concurrentCalls.decrementAndGet();
+ super.setDelegate(delegate);
+ }
}
static class Model {
@@ -93,8 +151,18 @@
private final boolean useThreads;
private final ThreadPoolExecutor pool;
private final Semaphore semaphore;
+ private final AtomicLong concurrentCalls = new AtomicLong();
+ private final ArrayBlockingQueue<Runnable> workQueue;
+ private final FaultToleranceMetrics.Counter callsAccepted;
+ private final FaultToleranceMetrics.Counter callsRejected;
+ private final FaultToleranceMetrics.Histogram executionDuration;
+ private final FaultToleranceMetrics.Histogram waitingDuration;
- private Model(final Bulkhead bulkhead, final boolean useThreads) {
+ private Model(final Bulkhead bulkhead, final boolean useThreads,
+ final FaultToleranceMetrics.Counter callsAccepted,
+ final FaultToleranceMetrics.Counter callsRejected,
+ final FaultToleranceMetrics.Histogram executionDuration,
+ final FaultToleranceMetrics.Histogram waitingDuration) {
this.value = bulkhead.value();
if (this.value <= 0) {
throw new FaultToleranceDefinitionException("Invalid value in @Bulkhead: " + value);
@@ -105,14 +173,37 @@
throw new FaultToleranceDefinitionException("Invalid value in @Bulkhead: " + value);
}
+ this.callsAccepted = callsAccepted;
+ this.callsRejected = callsRejected;
+ this.executionDuration = executionDuration;
+ this.waitingDuration = waitingDuration;
+
this.useThreads = useThreads;
- if (this.useThreads) {
- this.pool = new ThreadPoolExecutor(value, value, 0L, MILLISECONDS, new ArrayBlockingQueue<>(waitingQueue));
+ if (this.useThreads) { // important: use a pool dedicated for that concern and not a reusable one
+ this.workQueue = new ArrayBlockingQueue<>(waitingQueue);
+ this.pool = new ThreadPoolExecutor(value, value, 0L, MILLISECONDS, workQueue) {
+ @Override
+ public void execute(final Runnable command) {
+ final long submitted = System.nanoTime();
+ super.execute(() -> {
+ final long start = System.nanoTime();
+ waitingDuration.update(start - submitted);
+ try {
+ command.run();
+ } finally {
+ executionDuration.update(System.nanoTime() - start);
+ }
+ });
+ callsAccepted.inc();
+ }
+ };
this.pool.setRejectedExecutionHandler((r, executor) -> {
+ callsRejected.inc();
throw new BulkheadException("Can't accept task " + r);
});
this.semaphore = null;
} else {
+ this.workQueue = null;
this.pool = null;
this.semaphore = new Semaphore(value);
}
@@ -126,6 +217,9 @@
@Inject
private AnnotationFinder finder;
+ @Inject
+ private FaultToleranceMetrics metrics;
+
@PreDestroy
private void destroy() {
models.values().stream().filter(m -> m.pool != null).forEach(m -> m.pool.shutdownNow());
@@ -136,8 +230,36 @@
}
public Model create(final InvocationContext context) {
- return new Model(finder.findAnnotation(Bulkhead.class, context),
- finder.findAnnotation(Asynchronous.class, context) != null);
+ final boolean useThreads = finder.findAnnotation(Asynchronous.class, context) != null;
+
+ final String metricsNameBase = "ft." + context.getMethod().getDeclaringClass().getCanonicalName() + "."
+ + context.getMethod().getName() + ".bulkhead.";
+ final FaultToleranceMetrics.Counter callsAccepted = metrics.counter(metricsNameBase + "callsAccepted.total",
+ "Number of calls accepted by the bulkhead");
+ final FaultToleranceMetrics.Counter callsRejected = metrics.counter(metricsNameBase + "callsRejected.total",
+ "Number of calls rejected by the bulkhead");
+ final FaultToleranceMetrics.Histogram executionDuration = metrics.histogram(metricsNameBase + "executionDuration",
+ "Histogram of method execution times. This does not include any time spent waiting in the bulkhead queue.");
+ final FaultToleranceMetrics.Histogram waitingDuration;
+ if (useThreads) {
+ waitingDuration = metrics.histogram(metricsNameBase + "waiting.duration",
+ "Histogram of the time executions spend waiting in the queue");
+ } else {
+ waitingDuration = null;
+ }
+
+ final Model model = new Model(
+ finder.findAnnotation(Bulkhead.class, context), useThreads,
+ callsAccepted, callsRejected, executionDuration, waitingDuration);
+
+ metrics.gauge(metricsNameBase + "concurrentExecutions", "Number of currently running executions",
+ "none", model.concurrentCalls::get);
+ if (model.workQueue != null) {
+ metrics.gauge(metricsNameBase + "waitingQueue.population",
+ "Number of executions currently waiting in the queue", "none", () -> (long) model.workQueue.size());
+ }
+
+ return model;
}
}
}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/PriorityBinding.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/PriorityBinding.java
new file mode 100644
index 0000000..cfdaff9
--- /dev/null
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/PriorityBinding.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.safeguard.impl.cdi;
+
+import javax.annotation.Priority;
+import javax.enterprise.util.AnnotationLiteral;
+
+public class PriorityBinding extends AnnotationLiteral<Priority> implements Priority {
+ private final int value;
+
+ public PriorityBinding(final int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int value() {
+ return value;
+ }
+}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
index 38e0fea..132180e 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
@@ -1,5 +1,6 @@
package org.apache.safeguard.impl.cdi;
+import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;
import java.lang.annotation.Annotation;
@@ -12,21 +13,31 @@
import java.util.concurrent.Executors;
import java.util.stream.Stream;
+import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Default;
import javax.enterprise.inject.spi.AfterBeanDiscovery;
import javax.enterprise.inject.spi.AnnotatedMethod;
+import javax.enterprise.inject.spi.AnnotatedType;
+import javax.enterprise.inject.spi.BeforeBeanDiscovery;
import javax.enterprise.inject.spi.Extension;
import javax.enterprise.inject.spi.ProcessAnnotatedType;
import javax.enterprise.inject.spi.ProcessBean;
import javax.enterprise.inject.spi.WithAnnotations;
+import javax.interceptor.Interceptor;
+import org.apache.safeguard.impl.asynchronous.AsynchronousInterceptor;
+import org.apache.safeguard.impl.bulkhead.BulkheadInterceptor;
+import org.apache.safeguard.impl.circuitbreaker.CircuitBreakerInterceptor;
import org.apache.safeguard.impl.config.GeronimoFaultToleranceConfig;
import org.apache.safeguard.impl.customizable.Safeguard;
import org.apache.safeguard.impl.fallback.FallbackInterceptor;
import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
+import org.apache.safeguard.impl.retry.AfterRetryInterceptor;
+import org.apache.safeguard.impl.retry.BeforeRetryInterceptor;
+import org.apache.safeguard.impl.timeout.TimeoutInterceptor;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.Bulkhead;
import org.eclipse.microprofile.faulttolerance.CircuitBreaker;
@@ -34,9 +45,44 @@
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;
-// todo: mp.fault.tolerance.interceptor.priority handling
public class SafeguardExtension implements Extension {
private boolean foundExecutor;
+ private GeronimoFaultToleranceConfig config;
+ private Integer priorityBase;
+
+ void grabInterceptorPriority(@Observes final BeforeBeanDiscovery beforeBeanDiscovery) {
+ config = GeronimoFaultToleranceConfig.create();
+ priorityBase = ofNullable(config.read("mp.fault.tolerance.interceptor.priority"))
+ .map(Integer::parseInt).orElse(null);
+ }
+
+ void customizeAsyncPriority(@Observes final ProcessAnnotatedType<AsynchronousInterceptor> interceptor) {
+ customizePriority(interceptor);
+ }
+
+ void customizeBulkHeadPriority(@Observes final ProcessAnnotatedType<BulkheadInterceptor> interceptor) {
+ customizePriority(interceptor);
+ }
+
+ void customizeCircuitbreakerPriority(@Observes final ProcessAnnotatedType<CircuitBreakerInterceptor> interceptor) {
+ customizePriority(interceptor);
+ }
+
+ void customizeFallbackPriority(@Observes final ProcessAnnotatedType<FallbackInterceptor> interceptor) {
+ customizePriority(interceptor);
+ }
+
+ void customizeBeforeRetryPriority(@Observes final ProcessAnnotatedType<BeforeRetryInterceptor> interceptor) {
+ customizePriority(interceptor);
+ }
+
+ void customizeAfterRetryPriority(@Observes final ProcessAnnotatedType<AfterRetryInterceptor> interceptor) {
+ customizePriority(interceptor);
+ }
+
+ void customizeTimeoutPriority(@Observes final ProcessAnnotatedType<TimeoutInterceptor> interceptor) {
+ customizePriority(interceptor);
+ }
void addFallbackInterceptor(@Observes final ProcessAnnotatedType<FallbackInterceptor> processAnnotatedType) {
processAnnotatedType.configureAnnotatedType().add(new FallbackBinding());
@@ -99,6 +145,16 @@
}
}
+ private void customizePriority(final ProcessAnnotatedType<?> type) {
+ if (priorityBase == null) {
+ return;
+ }
+ final int offset = type.getAnnotatedType().getAnnotation(Priority.class).value() - Interceptor.Priority.PLATFORM_AFTER;
+ type.configureAnnotatedType()
+ .remove(it -> it.annotationType() == Priority.class)
+ .add(new PriorityBinding(priorityBase + offset));
+ }
+
public Class<?> toClass(final Type it) {
return doToClass(it, 0);
}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
index 65bb493..218614e 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
@@ -44,7 +44,7 @@
@CircuitBreaker
@Interceptor
-@Priority(Interceptor.Priority.PLATFORM_AFTER + 2)
+@Priority(Interceptor.Priority.PLATFORM_AFTER + 12)
public class CircuitBreakerInterceptor implements Serializable {
@Inject
private Cache cache;
@@ -337,7 +337,7 @@
private static class CheckIntervalData {
private final int length;
- private final Boolean[] states; // todo: revise that
+ private final Boolean[] states; // todo: revise that but seems the spec sucks
private final long checkIntervalStart;
CheckIntervalData(final int length, final Boolean[] states, final long intervalStart) {
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/config/ConfigurationMapper.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/config/ConfigurationMapper.java
index 77f0d2d..3789f1f 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/config/ConfigurationMapper.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/config/ConfigurationMapper.java
@@ -26,9 +26,12 @@
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.time.temporal.ChronoUnit;
+import java.util.Arrays;
import java.util.stream.Stream;
import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.spi.AnnotatedType;
+import javax.enterprise.inject.spi.BeanManager;
import javax.inject.Inject;
@ApplicationScoped
@@ -36,6 +39,9 @@
@Inject
private GeronimoFaultToleranceConfig config;
+ @Inject
+ private BeanManager beanManager;
+
public <T extends Annotation> T map(final T instance, final Method sourceMethod, final Class<T> api) {
return api.cast(Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{api, Enabled.class}, (proxy, method, args) -> {
@@ -49,19 +55,35 @@
private <T extends Annotation> Object findConfiguredValue(final T instance, final Class<T> api,
final Method sourceMethod,
final Method proxyMethod, final Object[] args) {
- return ofNullable(ofNullable(findMethodConfiguration(api, sourceMethod, proxyMethod))
- .orElseGet(() -> ofNullable(findDefaultConfiguration(proxyMethod))
- .orElseGet(() -> ofNullable(findClassConfiguration(api, sourceMethod, proxyMethod)).orElse(null))))
+ final AnnotatedType<?> selected = beanManager.createAnnotatedType(sourceMethod.getDeclaringClass());
+ final boolean methodLevel = selected.getMethods().stream()
+ .filter(it -> it.getJavaMember().getName().equals(sourceMethod.getName()) &&
+ Arrays.equals(it.getJavaMember().getParameterTypes(), sourceMethod.getParameterTypes()))
+ .anyMatch(it -> it.isAnnotationPresent(api));
+ if (methodLevel) {
+ return ofNullable(findDefaultConfiguration(proxyMethod))
+ .map(v -> coerce(v, proxyMethod.getReturnType()))
+ .orElseGet(() -> ofNullable(findMethodConfiguration(api, sourceMethod, proxyMethod))
+ .map(v -> coerce(v, proxyMethod.getReturnType()))
+ .orElseGet(() -> getReflectionConfig(instance, proxyMethod, args)));
+ }
+ return ofNullable(findDefaultConfiguration(proxyMethod))
.map(v -> coerce(v, proxyMethod.getReturnType()))
- .orElseGet(() -> {
- try {
- return proxyMethod.invoke(instance, args);
- } catch (final IllegalAccessException e) {
- throw new IllegalStateException(e);
- } catch (final InvocationTargetException e) {
- throw new IllegalStateException(e.getTargetException());
- }
- });
+ .orElseGet(() -> ofNullable(findClassConfiguration(api, sourceMethod, proxyMethod))
+ .map(v -> coerce(v, proxyMethod.getReturnType()))
+ .orElseGet(() -> getReflectionConfig(instance, proxyMethod, args)));
+ }
+
+ private <T extends Annotation> Object getReflectionConfig(final T instance,
+ final Method proxyMethod,
+ final Object[] args) {
+ try {
+ return proxyMethod.invoke(instance, args);
+ } catch (final IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ } catch (final InvocationTargetException e) {
+ throw new IllegalStateException(e.getTargetException());
+ }
}
private String findDefaultConfiguration(final Method api) {
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
index 141f2dc..d2c476d 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
@@ -31,7 +31,7 @@
@Priority(Interceptor.Priority.PLATFORM_AFTER + 10)
public class AfterRetryInterceptor extends BaseRetryInterceptor {
@Override
- protected void executeFinalCounterAction(final Map<String, Object> contextData, final String counterActionKey,
+ protected void executeFinalCounterAction(final Map<String, Object> contextData,
final FaultToleranceMetrics.Counter counter) {
// can be used to push it back to the before interceptor:
// contextData.put(counterActionKey, (Runnable) counter::inc);
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
index 89acc9f..aa8aa49 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
@@ -27,7 +27,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import javax.enterprise.context.ApplicationScoped;
@@ -40,6 +39,7 @@
import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
import org.eclipse.microprofile.faulttolerance.Retry;
+import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceException;
public abstract class BaseRetryInterceptor implements Serializable {
@@ -57,31 +57,31 @@
}
final Map<String, Object> contextData = context.getContextData();
- final String counterKey = BaseRetryInterceptor.class.getName() + ".counter_"
+ final String contextKey = BaseRetryInterceptor.class.getName() + ".context_"
+ contextData.get(IdGeneratorInterceptor.class.getName());
- final String counterActionKey = BaseRetryInterceptor.class.getName() + ".counterAction_"
- + contextData.get(IdGeneratorInterceptor.class.getName());
- AtomicInteger counter = AtomicInteger.class.cast(contextData.get(counterKey));
- if (counter == null) {
- counter = new AtomicInteger(model.maxRetries);
- contextData.put(counterKey, counter);
+ Context retryContext = Context.class.cast(contextData.get(contextKey));
+ if (retryContext == null) {
+ retryContext = new Context(System.nanoTime() + model.maxDuration, model.maxRetries);
+ contextData.put(contextKey, retryContext);
}
- while (counter.get() >= 0) {
+ while (retryContext.counter >= 0) {
try {
final Object proceed = context.proceed();
- if (counter.get() == model.maxRetries) {
- executeFinalCounterAction(contextData, counterActionKey, model.callsSucceededNotRetried);
+ if (retryContext.counter == model.maxRetries) {
+ executeFinalCounterAction(contextData, model.callsSucceededNotRetried);
} else {
- executeFinalCounterAction(contextData, counterActionKey, model.callsSucceededRetried);
+ executeFinalCounterAction(contextData, model.callsSucceededRetried);
}
return proceed;
+ } catch (final CircuitBreakerOpenException cboe) {
+ throw cboe;
} catch (final Exception re) {
// refresh the counter from the other interceptors
- counter = AtomicInteger.class.cast(contextData.get(counterKey));
+ retryContext = Context.class.cast(contextData.get(contextKey));
- if (model.abortOn(re) || counter.decrementAndGet() < 0) {
- executeFinalCounterAction(contextData, counterActionKey, model.callsFailed);
+ if (model.abortOn(re) || (--retryContext.counter) < 0 || System.nanoTime() >= retryContext.maxEnd) {
+ executeFinalCounterAction(contextData, model.callsFailed);
throw re;
}
if (!model.retryOn(re)) {
@@ -94,8 +94,7 @@
throw new FaultToleranceException("Inaccessible normally, here for compilation");
}
- protected abstract void executeFinalCounterAction(Map<String, Object> contextData, String counterActionKey,
- FaultToleranceMetrics.Counter counter);
+ protected abstract void executeFinalCounterAction(Map<String, Object> contextData, FaultToleranceMetrics.Counter counter);
static class Model {
@@ -186,4 +185,14 @@
metrics.counter(metricsNameBase + "retries.total", "The total number of times the method was retried"));
}
}
+
+ private static class Context {
+ private final long maxEnd;
+ private int counter;
+
+ private Context(final long maxEnd, final int maxRetries) {
+ this.maxEnd = maxEnd;
+ this.counter = maxRetries;
+ }
+ }
}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
index cf8884c..06f7de6 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
@@ -32,7 +32,6 @@
public class BeforeRetryInterceptor extends BaseRetryInterceptor {
@Override
protected void executeFinalCounterAction(final Map<String, Object> contextData,
- final String counterActionKey,
final FaultToleranceMetrics.Counter counter) {
counter.inc();
}
diff --git a/safeguard-impl/src/test/resources/dev.xml b/safeguard-impl/src/test/resources/dev.xml
index d0450ce..479c6d4 100644
--- a/safeguard-impl/src/test/resources/dev.xml
+++ b/safeguard-impl/src/test/resources/dev.xml
@@ -16,20 +16,15 @@
limitations under the License.
-->
<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" >
-<suite name="Dev Manual Test Run" verbose="2" configfailurepolicy="continue" >
- <test name="Manual Run">
+<suite name="Dev TCK Run" verbose="2" configfailurepolicy="continue" >
+ <test name="TCK Run">
<!-- all TCK -->
- <!--<packages>
- <package name="org.eclipse.microprofile.fault.tolerance.tck" />
+ <packages>
+ <package name="org.eclipse.microprofile.fault.tolerance.tck.*" />
</packages>
- -->
<classes>
- <class name="org.eclipse.microprofile.fault.tolerance.tck.AsynchronousTest"></class>
<!--
- <class name="org.eclipse.microprofile.fault.tolerance.tck.AsyncTimeoutTest"></class>
- <class name="org.eclipse.microprofile.fault.tolerance.tck.CircuitBreakerRetryTest"></class>
- <class name="org.eclipse.microprofile.fault.tolerance.tck.RetryTest"></class>
- <class name="org.eclipse.microprofile.fault.tolerance.tck.ConfigTest"></class>
+ <class name="org.eclipse.microprofile.fault.tolerance.tck.CircuitBreakerRetryTest" />
-->
</classes>
</test>