starting some work for mp-ft 2
diff --git a/pom.xml b/pom.xml
index d73d37a..7d4fa8a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,8 +73,8 @@
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
- <microprofile-fault-tolerance.version>1.1.3</microprofile-fault-tolerance.version>
- <owb.version>2.0.9</owb.version>
+ <microprofile-fault-tolerance.version>2.0.2</microprofile-fault-tolerance.version>
+ <owb.version>2.0.12</owb.version>
<arquillian.version>1.1.14.Final</arquillian.version>
<arquillian-weld-embedded.version>2.0.0.Final</arquillian-weld-embedded.version>
<cdi2-api.version>2.0</cdi2-api.version>
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
index 40facfd..b9033ad 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
@@ -44,7 +44,7 @@
private Cache cache;
@Override
- protected Executor getExecutor(final InvocationContext context) {
+ public Executor getExecutor(final InvocationContext context) {
return cache.getExecutor();
}
@@ -60,12 +60,12 @@
if (!enabled) {
return context.proceed();
}
- final String key = Asynchronous.class.getName() + ".skip_" +
- context.getContextData().get(IdGeneratorInterceptor.class.getName());
+ final String id = String.valueOf(context.getContextData().get(IdGeneratorInterceptor.class.getName()));
+ final String key = Asynchronous.class.getName() + ".skip_" + id;
if (context.getContextData().putIfAbsent(key, Boolean.TRUE) != null) { // bulkhead or so handling threading
return context.proceed();
}
- return around(context);
+ return around(context, id);
}
@ApplicationScoped
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
index 9626855..0e3666b 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
@@ -40,9 +40,9 @@
import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
public abstract class BaseAsynchronousInterceptor implements Serializable {
- protected abstract Executor getExecutor(InvocationContext context);
+ public abstract Executor getExecutor(InvocationContext context);
- protected Object around(final InvocationContext context) {
+ protected Object around(final InvocationContext context, final String id) {
final Class<?> returnType = context.getMethod().getReturnType();
if (CompletionStage.class.isAssignableFrom(returnType)) {
final ExtendedCompletableFuture<Object> future = newCompletableFuture(context);
@@ -55,19 +55,19 @@
future.after();
if (e != null) {
ofNullable(getErrorHandler(context.getContextData()))
- .map(eh -> {
- if (Exception.class.isInstance(e)) {
- try {
- eh.apply(Exception.class.cast(e));
- } catch (final Exception e1) {
- future.completeExceptionally(e1);
+ .map(eh -> {
+ if (Exception.class.isInstance(e)) {
+ try {
+ eh.apply(Exception.class.cast(e));
+ } catch (final Exception e1) {
+ future.completeExceptionally(e1);
+ }
+ } else {
+ future.completeExceptionally(e);
}
- } else {
- future.completeExceptionally(e);
- }
- return true;
- })
- .orElseGet(() -> future.completeExceptionally(e));
+ return true;
+ })
+ .orElseGet(() -> future.completeExceptionally(e));
} else {
future.complete(r);
}
@@ -81,18 +81,8 @@
}
if (Future.class.isAssignableFrom(returnType)) {
final FutureWrapper<Object> facade = newFuture(context, context.getContextData());
- getExecutor(context).execute(() -> {
- final Object proceed;
- try {
- facade.before();
- proceed = context.proceed();
- facade.setDelegate(Future.class.cast(proceed));
- } catch (final Exception e) {
- final CompletableFuture<Object> failingFuture = new CompletableFuture<>();
- failingFuture.completeExceptionally(e);
- facade.setDelegate(failingFuture);
- }
- });
+ context.getContextData().put(FutureWrapper.class.getName() + "_" + id, facade);
+ getExecutor(context).execute(new AsyncTask(context, facade));
return facade;
}
throw new FaultToleranceDefinitionException(
@@ -103,7 +93,7 @@
private static ErrorHandler<Exception, Future<?>> getErrorHandler(final Map<String, Object> contextData) {
return ErrorHandler.class.cast(
contextData.get(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" +
- contextData.get(IdGeneratorInterceptor.class.getName())));
+ contextData.get(IdGeneratorInterceptor.class.getName())));
}
protected FutureWrapper<Object> newFuture(final InvocationContext context,
@@ -116,7 +106,7 @@
}
@FunctionalInterface
- public interface ErrorHandler<A, B> {
+ public interface ErrorHandler<A, B> {
B apply(A a) throws Exception;
}
@@ -134,6 +124,7 @@
}
public static class FutureWrapper<T> implements Future<T>, BaseFuture {
+ private FutureWrapper<?> parent;
private final AtomicReference<Future<T>> delegate = new AtomicReference<>();
private final AtomicReference<Consumer<Future<T>>> cancelled = new AtomicReference<>();
private final CountDownLatch latch = new CountDownLatch(1);
@@ -143,6 +134,17 @@
this.data = data;
}
+ public FutureWrapper<?> getParent() {
+ return parent;
+ }
+
+ public void setParent(final FutureWrapper<?> parent) {
+ if (this.parent != null) {
+ this.parent.setParent(parent);
+ }
+ this.parent = parent;
+ }
+
public void setDelegate(final Future<T> delegate) {
final Consumer<Future<T>> cancelledTask = cancelled.get();
if (cancelledTask != null) {
@@ -167,7 +169,7 @@
@Override
public boolean isDone() {
final Future<T> future = delegate.get();
- return future != null && future.isDone();
+ return future != null && future.isDone() && getErrorHandler(data) == null;
}
@Override
@@ -177,33 +179,33 @@
try {
return future.get();
} catch (final ExecutionException ee) {
- final Future<T> newFuture = onException(ee);
- delegate.set(newFuture);
- return newFuture.get();
+ delegate.set(onException(ee));
+ return delegate.get().get();
}
}
@Override
public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- final long latchWaitStart = System.nanoTime();
+ final long end = System.nanoTime() + unit.toNanos(timeout);
final boolean latchWait = latch.await(timeout, unit);
- final long latchWaitDuration = System.nanoTime() - latchWaitStart;
- if (!latchWait) {
+ final long readyNs = System.nanoTime();
+ if (!latchWait || readyNs > end) {
throw new TimeoutException();
}
try {
- return delegate.get().get(unit.toNanos(timeout) - latchWaitDuration, NANOSECONDS);
+ return delegate.get().get(Math.max(0, end - readyNs), NANOSECONDS);
} catch (final ExecutionException ee) {
delegate.set(onException(ee));
- final long duration = unit.toNanos(timeout) - (System.nanoTime() - latchWaitDuration);
- if (duration < 0) {
+ final long now = System.nanoTime();
+ if (now > end) {
throw new TimeoutException();
}
- return delegate.get().get(duration, NANOSECONDS);
+ return delegate.get().get(Math.max(0, end - now), NANOSECONDS);
}
}
- protected Future<T> onException(final Throwable cause) throws ExecutionException {
+ // todo: revise
+ protected Future<T> onException(final Throwable cause) {
if (!Exception.class.isInstance(cause)) {
if (Error.class.isInstance(cause)) {
throw Error.class.cast(cause);
@@ -216,25 +218,68 @@
try {
return (Future<T>) handler.apply(ex);
} catch (final Exception e) {
- if (ExecutionException.class.isInstance(e)) {
- throw ExecutionException.class.cast(e);
- }
if (RuntimeException.class.isInstance(e)) {
throw RuntimeException.class.cast(e);
}
if (Error.class.isInstance(e)) {
throw Error.class.cast(e);
}
+ if (ExecutionException.class.isInstance(cause) && cause.getCause() != cause) {
+ return failAsFuture(cause);
+ }
throw new IllegalStateException(e);
}
}
- if (ExecutionException.class.isInstance(cause)) {
- throw ExecutionException.class.cast(cause);
+ if (ExecutionException.class.isInstance(cause) && cause.getCause() != cause) {
+ final Throwable throwable = ExecutionException.class.cast(cause).getCause();
+ return onException(throwable);
}
if (RuntimeException.class.isInstance(cause)) {
throw RuntimeException.class.cast(cause);
}
throw new IllegalStateException(cause); // unreachable - just for compiler
}
+
+ private Future<T> failAsFuture(final Throwable cause) {
+ final Throwable throwable = ExecutionException.class.cast(cause).getCause();
+ final ExtendedCompletableFuture<T> future = new ExtendedCompletableFuture<>();
+ future.completeExceptionally(throwable);
+ return future;
+ }
+
+ public void failed(final Exception exception) {
+ final ExtendedCompletableFuture<T> delegate = new ExtendedCompletableFuture<>();
+ delegate.completeExceptionally(exception);
+ setDelegate(delegate);
+ latch.countDown();
+ }
+ }
+
+ public static class AsyncTask implements Runnable {
+ private final InvocationContext context;
+ private final FutureWrapper<Object> facade;
+
+ public AsyncTask(final InvocationContext context, final FutureWrapper<Object> facade) {
+ this.context = context;
+ this.facade = facade;
+ }
+
+ public FutureWrapper<Object> getFacade() {
+ return facade;
+ }
+
+ @Override
+ public void run() {
+ final Object proceed;
+ try {
+ facade.before();
+ proceed = context.proceed();
+ facade.setDelegate(Future.class.cast(proceed));
+ } catch (final Exception e) {
+ final CompletableFuture<Object> failingFuture = new CompletableFuture<>();
+ failingFuture.completeExceptionally(e);
+ facade.setDelegate(failingFuture);
+ }
+ }
}
}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadHandler.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadHandler.java
new file mode 100644
index 0000000..b54d170
--- /dev/null
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.safeguard.impl.bulkhead;
+
+public interface BulkheadHandler {
+ void release();
+
+ void acquire();
+}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
index 3a71118..9d51d3a 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
@@ -21,7 +21,6 @@
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +28,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PreDestroy;
@@ -62,6 +62,32 @@
public Object bulkhead(final InvocationContext context) throws Exception {
final Map<Key, Model> models = cache.getModels();
final Key key = new Key(context, cache.getUnwrappedCache().getUnwrappedCache());
+ final Model model = getModel(context, models, key);
+ if (model.disabled) {
+ return context.proceed();
+ }
+ final Map<String, Object> data = context.getContextData();
+ final String id = String.valueOf(data.get(IdGeneratorInterceptor.class.getName()));
+ if (model.useThreads) {
+ data.put(BulkheadInterceptor.class.getName() + ".self_" + id, this);
+ data.put(BulkheadInterceptor.class.getName() + ".model_" + id, model);
+ data.put(BulkheadInterceptor.class.getName() + "_" + id, model.pool);
+ data.put(Asynchronous.class.getName() + ".skip_" + id, Boolean.TRUE);
+ return around(context, id);
+ }
+
+ final BulkheadHandler handler = new ReleaseHandler(model);
+ data.put(BulkheadHandler.class.getName() + "_" + id, handler);
+ handler.acquire();
+ try {
+ return context.proceed();
+ } finally {
+ handler.release();
+ data.remove(BulkheadHandler.class.getName() + "_" + id);
+ }
+ }
+
+ private Model getModel(final InvocationContext context, final Map<Key, Model> models, final Key key) {
Model model = models.get(key);
if (model == null) {
model = cache.create(context);
@@ -72,32 +98,7 @@
cache.postCreate(model, context);
}
}
- if (model.disabled) {
- return context.proceed();
- }
- if (model.useThreads) {
- final Map<String, Object> data = context.getContextData();
- final Object id = data.get(IdGeneratorInterceptor.class.getName());
- data.put(BulkheadInterceptor.class.getName() + ".model_" + id, model);
- data.put(BulkheadInterceptor.class.getName() + "_" + id, model.pool);
- data.put(Asynchronous.class.getName() + ".skip_" + id, Boolean.TRUE);
- return around(context);
- } else {
- if (!model.semaphore.tryAcquire()) {
- model.callsRejected.inc();
- throw new BulkheadException("No more permission available");
- }
- model.callsAccepted.inc();
- model.concurrentCalls.incrementAndGet();
- final long start = System.nanoTime();
- try {
- return context.proceed();
- } finally {
- model.executionDuration.update(System.nanoTime() - start);
- model.semaphore.release();
- model.concurrentCalls.decrementAndGet();
- }
- }
+ return model;
}
private Model getModel(final InvocationContext context) {
@@ -117,7 +118,7 @@
}
@Override
- protected Executor getExecutor(final InvocationContext context) {
+ public Executor getExecutor(final InvocationContext context) {
return Executor.class.cast(context.getContextData()
.get(BulkheadInterceptor.class.getName() + "_" + context.getContextData().get(IdGeneratorInterceptor.class.getName())));
}
@@ -212,20 +213,28 @@
}
}, (r, executor) -> {
callsRejected.inc();
- throw new BulkheadException("Can't accept task " + r);
+
+ final BulkheadException bulkheadException = new BulkheadException("Can't accept task " + r);
+ if (BulkheadAsyncTask.class.isInstance(r)) { // normally yes
+ final BulkheadAsyncTask bulkheadAsyncTask = BulkheadAsyncTask.class.cast(r);
+ if (AsyncTask.class.isInstance(bulkheadAsyncTask.command)) { // todo: handle a way to always unwrap
+ final AsyncTask asyncTask = AsyncTask.class.cast(bulkheadAsyncTask.command);
+ final FutureWrapper<Object> facade = asyncTask.getFacade();
+ facade.failed(bulkheadException);
+ if (facade.getParent() != null) {
+ facade.getParent().failed(bulkheadException);
+ }
+ return; // handled
+ }
+ }
+
+ // else just throw it (sync case)
+ throw bulkheadException;
}) {
@Override
public void execute(final Runnable command) {
final long submitted = System.nanoTime();
- super.execute(() -> {
- final long start = System.nanoTime();
- waitingDuration.update(start - submitted);
- try {
- command.run();
- } finally {
- executionDuration.update(System.nanoTime() - start);
- }
- });
+ super.execute(new BulkheadAsyncTask(waitingDuration, command, submitted));
callsAccepted.inc();
}
};
@@ -238,6 +247,30 @@
}
}
+ private static class BulkheadAsyncTask implements Runnable {
+ private final FaultToleranceMetrics.Histogram histogram;
+ private final Runnable command;
+ private final long submitted;
+
+ private BulkheadAsyncTask(final FaultToleranceMetrics.Histogram histogram,
+ final Runnable command, final long submitted) {
+ this.histogram = histogram;
+ this.command = command;
+ this.submitted = submitted;
+ }
+
+ @Override
+ public void run() {
+ final long start = System.nanoTime();
+ histogram.update(start - submitted);
+ try {
+ command.run();
+ } finally {
+ histogram.update(System.nanoTime() - start);
+ }
+ }
+ }
+
@ApplicationScoped
public static class Cache {
private final Map<Key, Model> models = new ConcurrentHashMap<>();
@@ -306,4 +339,36 @@
}
}
}
+
+ private static class ReleaseHandler implements BulkheadHandler {
+ private final Model model;
+ private final AtomicBoolean released = new AtomicBoolean();
+ private long start;
+
+ public ReleaseHandler(final Model model) {
+ this.model = model;
+ }
+
+ @Override
+ public void acquire() {
+ if (!model.semaphore.tryAcquire()) {
+ model.callsRejected.inc();
+ throw new BulkheadException("No more permission available");
+ }
+ model.callsAccepted.inc();
+ model.concurrentCalls.incrementAndGet();
+ start = System.nanoTime();
+ released.set(false);
+ }
+
+ @Override
+ public void release() {
+ if (!released.compareAndSet(false, true)) {
+ return;
+ }
+ model.executionDuration.update(System.nanoTime() - start);
+ model.semaphore.release();
+ model.concurrentCalls.decrementAndGet();
+ }
+ }
}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
index c3a85f9..013791b 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
@@ -77,6 +77,7 @@
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;
+import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
public class SafeguardExtension implements Extension {
private boolean foundExecutor;
@@ -284,10 +285,12 @@
final AnnotatedType<?> annotatedType = AnnotatedType.class.cast(type);
try {
annotatedType.getMethods().stream()
- .filter(it -> classHasMarker || it.isAnnotationPresent(marker))
- .map(m -> new MockInvocationContext(m.getJavaMember()))
- .forEach(contextConsumer);
+ .filter(it -> classHasMarker || it.isAnnotationPresent(marker))
+ .map(m -> new MockInvocationContext(m.getJavaMember()))
+ .forEach(contextConsumer);
return null;
+ } catch (final FaultToleranceDefinitionException ftde) {
+ throw ftde;
} catch (final RuntimeException re) {
return new DefinitionException(re);
}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
index 3b5b5e9..21f75a1 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
@@ -180,15 +180,10 @@
handler = fallbackHandler;
} else {
try {
- final Method fallbackMethod = ofNullable(context.getTarget())
- .map(Object::getClass)
- .orElseGet(() -> Class.class.cast(context.getMethod().getDeclaringClass()))
- .getMethod(method, context.getMethod().getParameterTypes());
- if (!extension.toClass(context.getMethod()
- .getReturnType())
- .isAssignableFrom(extension.toClass(fallbackMethod.getReturnType())) || !Arrays.equals(
- context.getMethod()
- .getParameterTypes(), fallbackMethod.getParameterTypes())) {
+ final Method fallbackMethod = getFallbackMethod(context, method);
+ if (!extension.toClass(context.getMethod().getReturnType())
+ .isAssignableFrom(extension.toClass(fallbackMethod.getReturnType())) ||
+ !Arrays.equals(context.getMethod().getParameterTypes(), fallbackMethod.getParameterTypes())) {
throw new FaultToleranceDefinitionException("handler method does not match method: " + context.getMethod());
}
if (!fallbackMethod.isAccessible()) {
@@ -225,6 +220,30 @@
return handler.handle(context12);
};
}
+
+ private Method getFallbackMethod(final InvocationContext context, final String method) throws NoSuchMethodException {
+ final Class<?> rootClass = ofNullable(context.getTarget())
+ .map(Object::getClass)
+ .orElseGet(() -> Class.class.cast(context.getMethod().getDeclaringClass()));
+ Class<?> current = rootClass;
+ while (current != null) {
+ try {
+ return current.getDeclaredMethod(method, context.getMethod().getParameterTypes());
+ } catch (final NoSuchMethodException nsme) {
+ current = current.getSuperclass();
+ }
+ }
+ return Stream.of(rootClass.getInterfaces())
+ .flatMap(c -> {
+ try {
+ return Stream.of(c.getDeclaredMethod(method, context.getMethod().getParameterTypes()));
+ } catch (final NoSuchMethodException nsme) {
+ return Stream.empty();
+ }
+ })
+ .findFirst()
+ .orElseThrow(() -> new NoSuchMethodException(method + " as fallback for " + context.getMethod()));
+ }
}
private interface EnrichedExecutionContext extends ExecutionContext {
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
index d2c476d..675b4a5 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
@@ -31,6 +31,11 @@
@Priority(Interceptor.Priority.PLATFORM_AFTER + 10)
public class AfterRetryInterceptor extends BaseRetryInterceptor {
@Override
+ protected boolean suspendBulkhead() {
+ return true;
+ }
+
+ @Override
protected void executeFinalCounterAction(final Map<String, Object> contextData,
final FaultToleranceMetrics.Counter counter) {
// can be used to push it back to the before interceptor:
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
index 61c7ba8..69ed1f4 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
@@ -24,10 +24,13 @@
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import javax.enterprise.context.ApplicationScoped;
@@ -37,11 +40,14 @@
import org.apache.safeguard.impl.annotation.AnnotationFinder;
import org.apache.safeguard.impl.asynchronous.BaseAsynchronousInterceptor;
+import org.apache.safeguard.impl.bulkhead.BulkheadHandler;
+import org.apache.safeguard.impl.bulkhead.BulkheadInterceptor;
import org.apache.safeguard.impl.cache.Key;
import org.apache.safeguard.impl.cache.UnwrappedCache;
import org.apache.safeguard.impl.config.ConfigurationMapper;
import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
+import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
@@ -56,6 +62,134 @@
public Object retry(final InvocationContext context) throws Exception {
final Map<Key, Model> models = cache.getModels();
final Key cacheKey = new Key(context, cache.getUnwrapped());
+ final Model model = getModel(context, models, cacheKey);
+ if (model.disabled) {
+ return context.proceed();
+ }
+ final Map<String, Object> contextData = context.getContextData();
+ final String id = String.valueOf(contextData.get(IdGeneratorInterceptor.class.getName()));
+ final String contextKey = BaseRetryInterceptor.class.getName() + ".context_" + id;
+ Context retryContext = Context.class.cast(contextData.get(contextKey));
+ if (retryContext == null) {
+ retryContext = new Context(System.nanoTime() + model.maxDuration, model.maxRetries);
+ contextData.put(contextKey, retryContext);
+ }
+
+ // todo: make it more elegant, maybe an event
+ final BulkheadHandler bulkheadHandler = suspendBulkhead() ?
+ getSyncBulkheadHandler(contextData, id) : null;
+ final BulkheadInterceptor bulkheadInterceptor = suspendBulkhead() ?
+ getThreadedBulkHeadInterceptor(contextData, id) : null;
+
+ final AtomicBoolean mustAcquireBulkheadPerm = new AtomicBoolean(false);
+ while (retryContext.counter >= 0) {
+ try {
+ return retryIteration(context, model, id, contextKey, retryContext,
+ bulkheadInterceptor, bulkheadHandler, mustAcquireBulkheadPerm);
+ } catch (final Exception re) {
+ try {
+ retryContext = handleException(contextData, contextKey, model, re, bulkheadHandler);
+ } catch (final Exception e) {
+ if (isAsync(context)) {
+ final CompletableFuture<Object> future = new BaseAsynchronousInterceptor.ExtendedCompletableFuture<>();
+ future.completeExceptionally(re);
+ // avoid to call it yet another time
+ contextData.remove(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" + id);
+ return future;
+ }
+ throw e;
+ }
+ mustAcquireBulkheadPerm.set(bulkheadHandler != null);
+ }
+ }
+ throw new FaultToleranceException("Inaccessible normally, here for compilation");
+ }
+
+ private BulkheadHandler getSyncBulkheadHandler(final Map<String, Object> contextData, final Object id) {
+ return BulkheadHandler.class.cast(contextData.get(BulkheadHandler.class.getName() + "_" + id));
+ }
+
+ private BulkheadInterceptor getThreadedBulkHeadInterceptor(final Map<String, Object> contextData, final Object id) {
+ return BulkheadInterceptor.class.cast(contextData.get(BulkheadInterceptor.class.getName() + ".self_" + id));
+ }
+
+ private boolean isAsync(final InvocationContext context) {
+ final Map<String, Object> contextData = context.getContextData();
+ return contextData.get(Asynchronous.class.getName() + ".skip_" +
+ contextData.get(IdGeneratorInterceptor.class.getName())) != null;
+ }
+
+ private Object retryIteration(final InvocationContext context, final Model model,
+ final String id, final String contextKey,
+ final Context retryContext,
+ final BulkheadInterceptor bulkheadInterceptor,
+ final BulkheadHandler bulkheadHandler,
+ final AtomicBoolean mustAcquireBulkheadPerm) throws Exception {
+ if (mustAcquireBulkheadPerm.get()) {
+ bulkheadHandler.acquire();
+ }
+ final Object proceed = bulkheadInterceptor != null ?
+ resubmitThroughBulkhead(context, bulkheadInterceptor, id) : context.proceed();
+ final Map<String, Object> contextData = context.getContextData();
+ if (retryContext.counter == model.maxRetries) {
+ executeFinalCounterAction(contextData, model.callsSucceededNotRetried);
+ } else {
+ executeFinalCounterAction(contextData, model.callsSucceededRetried);
+ }
+ if (BaseAsynchronousInterceptor.BaseFuture.class.isInstance(proceed)) {
+ final Model modelRef = model;
+ final String key = BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" + id;
+ contextData.put(
+ key,
+ (BaseAsynchronousInterceptor.ErrorHandler<Exception, Future<?>>) error -> {
+ // todo: in async mode we shouldnt wait with "sleep" there - either a SES or a CS/CF
+ handleException(contextData, contextKey, modelRef, error, bulkheadHandler);
+ mustAcquireBulkheadPerm.set(bulkheadHandler != null);
+ final Context ctx = Context.class.cast(contextData.get(contextKey));
+ if (ctx != null && ctx.counter >= 0) {
+ return Future.class.cast(retryIteration(
+ context, model, id, contextKey, retryContext,
+ bulkheadInterceptor, bulkheadHandler, mustAcquireBulkheadPerm));
+ }
+
+ final BaseAsynchronousInterceptor.ExtendedCompletableFuture<Object> future = new BaseAsynchronousInterceptor.ExtendedCompletableFuture<>();
+ future.completeExceptionally(error);
+ contextData.remove(key);
+ return future;
+ });
+ if (CompletionStage.class.isInstance(proceed) && mustAcquireBulkheadPerm.get()) {
+ return CompletionStage.class.cast(proceed)
+ .whenComplete((r, e) -> bulkheadHandler.release());
+ }
+ }
+ if (mustAcquireBulkheadPerm.get()) {
+ bulkheadHandler.release();
+ }
+ return proceed;
+ }
+
+ private Object resubmitThroughBulkhead(final InvocationContext context,
+ final BulkheadInterceptor bulkheadInterceptor,
+ final String id) throws Exception {
+ if (CompletionStage.class.isAssignableFrom(context.getMethod().getReturnType())) {
+ return bulkheadInterceptor.bulkhead(context);
+ }
+ final Map<String, Object> data = context.getContextData();
+ final Object pool = data.get(BulkheadInterceptor.class.getName() + "_" + data.get(IdGeneratorInterceptor.class.getName()));
+ if (pool != null) {
+ final BaseAsynchronousInterceptor.FutureWrapper<?> parent = BaseAsynchronousInterceptor.FutureWrapper.class.cast(data.get(BaseAsynchronousInterceptor.FutureWrapper.class.getName() + "_" + id));
+ final BaseAsynchronousInterceptor.FutureWrapper wrapper = new BaseAsynchronousInterceptor.FutureWrapper<>(data);
+ if (parent != null) {
+ wrapper.setParent(parent);
+ }
+ final BaseAsynchronousInterceptor.AsyncTask task = new BaseAsynchronousInterceptor.AsyncTask(context, wrapper);
+ bulkheadInterceptor.getExecutor(context).execute(task);
+ return wrapper;
+ }
+ return bulkheadInterceptor.bulkhead(context);
+ }
+
+ private Model getModel(final InvocationContext context, final Map<Key, Model> models, final Key cacheKey) {
Model model = models.get(cacheKey);
if (model == null) {
model = cache.create(context);
@@ -64,44 +198,18 @@
model = existing;
}
}
- if (model.disabled) {
- return context.proceed();
- }
- final Map<String, Object> contextData = context.getContextData();
- final Object id = contextData.get(IdGeneratorInterceptor.class.getName());
- final String contextKey = BaseRetryInterceptor.class.getName() + ".context_" + id;
- Context retryContext = Context.class.cast(contextData.get(contextKey));
- if (retryContext == null) {
- retryContext = new Context(System.nanoTime() + model.maxDuration, model.maxRetries);
- contextData.put(contextKey, retryContext);
- }
-
- while (retryContext.counter >= 0) { // todo: handle async if result is a Future or CompletionStage (weird no?)
- try {
- final Object proceed = context.proceed();
- if (retryContext.counter == model.maxRetries) {
- executeFinalCounterAction(contextData, model.callsSucceededNotRetried);
- } else {
- executeFinalCounterAction(contextData, model.callsSucceededRetried);
- }
- if (BaseAsynchronousInterceptor.BaseFuture.class.isInstance(proceed)) {
- final Model modelRef = model;
- contextData.put(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" + id,
- (BaseAsynchronousInterceptor.ErrorHandler<Exception, Future<?>>) error -> {
- handleException(contextData, contextKey, modelRef, error);
- return Future.class.cast(context.proceed());
- });
- }
- return proceed;
- } catch (final Exception re) {
- retryContext = handleException(contextData, contextKey, model, re);
- }
- }
- throw new FaultToleranceException("Inaccessible normally, here for compilation");
+ return model;
}
+ protected abstract boolean suspendBulkhead();
+
private Context handleException(final Map<String, Object> contextData, final String contextKey,
- final Model modelRef, final Exception error) throws Exception {
+ final Model modelRef, final Exception error, final BulkheadHandler bulkheadHandler) throws Exception {
+ // if we retry, we release bulkhead to avoid to hold it while we pause
+ if (bulkheadHandler != null) {
+ bulkheadHandler.release();
+ }
+
if (CircuitBreakerOpenException.class.isInstance(error)) {
throw error;
}
@@ -109,13 +217,14 @@
// refresh the counter from the other interceptors
final Context ctx = Context.class.cast(contextData.get(contextKey));
- if (modelRef.abortOn(error) || (--ctx.counter) < 0 || System.nanoTime() >= ctx.maxEnd) {
+ if (modelRef.abortOn(error) || ctx.decr() < 0 || System.nanoTime() >= ctx.maxEnd) {
executeFinalCounterAction(contextData, modelRef.callsFailed);
throw error;
}
if (!modelRef.retryOn(error)) {
throw error;
}
+
modelRef.retries.inc();
final long pause = modelRef.nextPause();
if (pause > 0) {
@@ -157,7 +266,7 @@
this.disabled = disabled;
this.abortOn = retry.abortOn();
this.retryOn = retry.retryOn();
- this.maxDuration = retry.delayUnit().getDuration().toNanos() * retry.maxDuration();
+ this.maxDuration = retry.durationUnit().getDuration().toNanos() * retry.maxDuration();
this.maxRetries = retry.maxRetries();
this.delay = retry.delayUnit().getDuration().toNanos() * retry.delay();
this.jitter = retry.jitterDelayUnit().getDuration().toNanos() * retry.jitter();
@@ -197,8 +306,9 @@
private long nextPause() {
final ThreadLocalRandom random = ThreadLocalRandom.current();
- return TimeUnit.NANOSECONDS
- .toMillis(min(maxDuration, max(0, ((random.nextBoolean() ? 1 : -1) * delay) + (jitter == 0 ? 0 : random.nextLong(jitter)))));
+ final long nextDelay = (random.nextBoolean() ? 1 : -1) * delay;
+ final long nextJitter = jitter == 0 ? 0 : random.nextLong(jitter);
+ return TimeUnit.NANOSECONDS.toMillis(min(maxDuration, max(0, nextDelay + nextJitter)));
}
}
@@ -253,5 +363,9 @@
this.maxEnd = maxEnd;
this.counter = maxRetries;
}
+
+ private synchronized int decr() {
+ return --counter;
+ }
}
}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
index 06f7de6..90f889d 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
@@ -31,6 +31,11 @@
@Priority(Interceptor.Priority.PLATFORM_AFTER)
public class BeforeRetryInterceptor extends BaseRetryInterceptor {
@Override
+ protected boolean suspendBulkhead() {
+ return false;
+ }
+
+ @Override
protected void executeFinalCounterAction(final Map<String, Object> contextData,
final FaultToleranceMetrics.Counter counter) {
counter.inc();
diff --git a/safeguard-impl/src/test/resources/dev.xml b/safeguard-impl/src/test/resources/dev.xml
index bfbcf47..ff82e8d 100644
--- a/safeguard-impl/src/test/resources/dev.xml
+++ b/safeguard-impl/src/test/resources/dev.xml
@@ -19,11 +19,19 @@
<suite name="Dev TCK Run" verbose="2" configfailurepolicy="continue" >
<test name="TCK Run">
<!-- all TCK -->
+ <!--
<packages>
<package name="org.eclipse.microprofile.fault.tolerance.tck.*" />
</packages>
+ -->
<!--
to debug a single test (class or method), comment packages and add <classes>
-->
+ <classes>
+ <class name="org.eclipse.microprofile.fault.tolerance.tck.bulkhead.BulkheadAsynchRetryTest">
+ <methods>
+ </methods>
+ </class>
+ </classes>
</test>
</suite>