starting some work for mp-ft 2
diff --git a/pom.xml b/pom.xml
index d73d37a..7d4fa8a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,8 +73,8 @@
     <properties>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
-        <microprofile-fault-tolerance.version>1.1.3</microprofile-fault-tolerance.version>
-        <owb.version>2.0.9</owb.version>
+        <microprofile-fault-tolerance.version>2.0.2</microprofile-fault-tolerance.version>
+        <owb.version>2.0.12</owb.version>
         <arquillian.version>1.1.14.Final</arquillian.version>
         <arquillian-weld-embedded.version>2.0.0.Final</arquillian-weld-embedded.version>
         <cdi2-api.version>2.0</cdi2-api.version>
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
index 40facfd..b9033ad 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
@@ -44,7 +44,7 @@
     private Cache cache;
 
     @Override
-    protected Executor getExecutor(final InvocationContext context) {
+    public Executor getExecutor(final InvocationContext context) {
         return cache.getExecutor();
     }
 
@@ -60,12 +60,12 @@
         if (!enabled) {
             return context.proceed();
         }
-        final String key = Asynchronous.class.getName() + ".skip_" +
-                context.getContextData().get(IdGeneratorInterceptor.class.getName());
+        final String id = String.valueOf(context.getContextData().get(IdGeneratorInterceptor.class.getName()));
+        final String key = Asynchronous.class.getName() + ".skip_" + id;
         if (context.getContextData().putIfAbsent(key, Boolean.TRUE) != null) { // bulkhead or so handling threading
             return context.proceed();
         }
-        return around(context);
+        return around(context, id);
     }
 
     @ApplicationScoped
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
index 9626855..0e3666b 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
@@ -40,9 +40,9 @@
 import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
 
 public abstract class BaseAsynchronousInterceptor implements Serializable {
-    protected abstract Executor getExecutor(InvocationContext context);
+    public abstract Executor getExecutor(InvocationContext context);
 
-    protected Object around(final InvocationContext context) {
+    protected Object around(final InvocationContext context, final String id) {
         final Class<?> returnType = context.getMethod().getReturnType();
         if (CompletionStage.class.isAssignableFrom(returnType)) {
             final ExtendedCompletableFuture<Object> future = newCompletableFuture(context);
@@ -55,19 +55,19 @@
                         future.after();
                         if (e != null) {
                             ofNullable(getErrorHandler(context.getContextData()))
-                                .map(eh -> {
-                                    if (Exception.class.isInstance(e)) {
-                                        try {
-                                            eh.apply(Exception.class.cast(e));
-                                        } catch (final Exception e1) {
-                                            future.completeExceptionally(e1);
+                                    .map(eh -> {
+                                        if (Exception.class.isInstance(e)) {
+                                            try {
+                                                eh.apply(Exception.class.cast(e));
+                                            } catch (final Exception e1) {
+                                                future.completeExceptionally(e1);
+                                            }
+                                        } else {
+                                            future.completeExceptionally(e);
                                         }
-                                    } else {
-                                        future.completeExceptionally(e);
-                                    }
-                                    return true;
-                                })
-                                .orElseGet(() -> future.completeExceptionally(e));
+                                        return true;
+                                    })
+                                    .orElseGet(() -> future.completeExceptionally(e));
                         } else {
                             future.complete(r);
                         }
@@ -81,18 +81,8 @@
         }
         if (Future.class.isAssignableFrom(returnType)) {
             final FutureWrapper<Object> facade = newFuture(context, context.getContextData());
-            getExecutor(context).execute(() -> {
-                final Object proceed;
-                try {
-                    facade.before();
-                    proceed = context.proceed();
-                    facade.setDelegate(Future.class.cast(proceed));
-                } catch (final Exception e) {
-                    final CompletableFuture<Object> failingFuture = new CompletableFuture<>();
-                    failingFuture.completeExceptionally(e);
-                    facade.setDelegate(failingFuture);
-                }
-            });
+            context.getContextData().put(FutureWrapper.class.getName() + "_" + id, facade);
+            getExecutor(context).execute(new AsyncTask(context, facade));
             return facade;
         }
         throw new FaultToleranceDefinitionException(
@@ -103,7 +93,7 @@
     private static ErrorHandler<Exception, Future<?>> getErrorHandler(final Map<String, Object> contextData) {
         return ErrorHandler.class.cast(
                 contextData.get(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" +
-                contextData.get(IdGeneratorInterceptor.class.getName())));
+                        contextData.get(IdGeneratorInterceptor.class.getName())));
     }
 
     protected FutureWrapper<Object> newFuture(final InvocationContext context,
@@ -116,7 +106,7 @@
     }
 
     @FunctionalInterface
-    public interface ErrorHandler<A, B>  {
+    public interface ErrorHandler<A, B> {
         B apply(A a) throws Exception;
     }
 
@@ -134,6 +124,7 @@
     }
 
     public static class FutureWrapper<T> implements Future<T>, BaseFuture {
+        private FutureWrapper<?> parent;
         private final AtomicReference<Future<T>> delegate = new AtomicReference<>();
         private final AtomicReference<Consumer<Future<T>>> cancelled = new AtomicReference<>();
         private final CountDownLatch latch = new CountDownLatch(1);
@@ -143,6 +134,17 @@
             this.data = data;
         }
 
+        public FutureWrapper<?> getParent() {
+            return parent;
+        }
+
+        public void setParent(final FutureWrapper<?> parent) {
+            if (this.parent != null) {
+                this.parent.setParent(parent);
+            }
+            this.parent = parent;
+        }
+
         public void setDelegate(final Future<T> delegate) {
             final Consumer<Future<T>> cancelledTask = cancelled.get();
             if (cancelledTask != null) {
@@ -167,7 +169,7 @@
         @Override
         public boolean isDone() {
             final Future<T> future = delegate.get();
-            return future != null && future.isDone();
+            return future != null && future.isDone() && getErrorHandler(data) == null;
         }
 
         @Override
@@ -177,33 +179,33 @@
             try {
                 return future.get();
             } catch (final ExecutionException ee) {
-                final Future<T> newFuture = onException(ee);
-                delegate.set(newFuture);
-                return newFuture.get();
+                delegate.set(onException(ee));
+                return delegate.get().get();
             }
         }
 
         @Override
         public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-            final long latchWaitStart = System.nanoTime();
+            final long end = System.nanoTime() + unit.toNanos(timeout);
             final boolean latchWait = latch.await(timeout, unit);
-            final long latchWaitDuration = System.nanoTime() - latchWaitStart;
-            if (!latchWait) {
+            final long readyNs = System.nanoTime();
+            if (!latchWait || readyNs > end) {
                 throw new TimeoutException();
             }
             try {
-                return delegate.get().get(unit.toNanos(timeout) - latchWaitDuration, NANOSECONDS);
+                return delegate.get().get(Math.max(0, end - readyNs), NANOSECONDS);
             } catch (final ExecutionException ee) {
                 delegate.set(onException(ee));
-                final long duration = unit.toNanos(timeout) - (System.nanoTime() - latchWaitDuration);
-                if (duration < 0) {
+                final long now = System.nanoTime();
+                if (now > end) {
                     throw new TimeoutException();
                 }
-                return delegate.get().get(duration, NANOSECONDS);
+                return delegate.get().get(Math.max(0, end - now), NANOSECONDS);
             }
         }
 
-        protected Future<T> onException(final Throwable cause) throws ExecutionException {
+        // todo: revise
+        protected Future<T> onException(final Throwable cause) {
             if (!Exception.class.isInstance(cause)) {
                 if (Error.class.isInstance(cause)) {
                     throw Error.class.cast(cause);
@@ -216,25 +218,68 @@
                 try {
                     return (Future<T>) handler.apply(ex);
                 } catch (final Exception e) {
-                    if (ExecutionException.class.isInstance(e)) {
-                        throw ExecutionException.class.cast(e);
-                    }
                     if (RuntimeException.class.isInstance(e)) {
                         throw RuntimeException.class.cast(e);
                     }
                     if (Error.class.isInstance(e)) {
                         throw Error.class.cast(e);
                     }
+                    if (ExecutionException.class.isInstance(cause) && cause.getCause() != cause) {
+                        return failAsFuture(cause);
+                    }
                     throw new IllegalStateException(e);
                 }
             }
-            if (ExecutionException.class.isInstance(cause)) {
-                throw ExecutionException.class.cast(cause);
+            if (ExecutionException.class.isInstance(cause) && cause.getCause() != cause) {
+                final Throwable throwable = ExecutionException.class.cast(cause).getCause();
+                return onException(throwable);
             }
             if (RuntimeException.class.isInstance(cause)) {
                 throw RuntimeException.class.cast(cause);
             }
             throw new IllegalStateException(cause); // unreachable - just for compiler
         }
+
+        private Future<T> failAsFuture(final Throwable cause) {
+            final Throwable throwable = ExecutionException.class.cast(cause).getCause();
+            final ExtendedCompletableFuture<T> future = new ExtendedCompletableFuture<>();
+            future.completeExceptionally(throwable);
+            return future;
+        }
+
+        public void failed(final Exception exception) {
+            final ExtendedCompletableFuture<T> delegate = new ExtendedCompletableFuture<>();
+            delegate.completeExceptionally(exception);
+            setDelegate(delegate);
+            latch.countDown();
+        }
+    }
+
+    public static class AsyncTask implements Runnable {
+        private final InvocationContext context;
+        private final FutureWrapper<Object> facade;
+
+        public AsyncTask(final InvocationContext context, final FutureWrapper<Object> facade) {
+            this.context = context;
+            this.facade = facade;
+        }
+
+        public FutureWrapper<Object> getFacade() {
+            return facade;
+        }
+
+        @Override
+        public void run() {
+            final Object proceed;
+            try {
+                facade.before();
+                proceed = context.proceed();
+                facade.setDelegate(Future.class.cast(proceed));
+            } catch (final Exception e) {
+                final CompletableFuture<Object> failingFuture = new CompletableFuture<>();
+                failingFuture.completeExceptionally(e);
+                facade.setDelegate(failingFuture);
+            }
+        }
     }
 }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadHandler.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadHandler.java
new file mode 100644
index 0000000..b54d170
--- /dev/null
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadHandler.java
@@ -0,0 +1,25 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.safeguard.impl.bulkhead;
+
+public interface BulkheadHandler {
+    void release();
+
+    void acquire();
+}
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
index 3a71118..9d51d3a 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
@@ -21,7 +21,6 @@
 import static java.util.Optional.ofNullable;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
-import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -29,6 +28,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.PreDestroy;
@@ -62,6 +62,32 @@
     public Object bulkhead(final InvocationContext context) throws Exception {
         final Map<Key, Model> models = cache.getModels();
         final Key key = new Key(context, cache.getUnwrappedCache().getUnwrappedCache());
+        final Model model = getModel(context, models, key);
+        if (model.disabled) {
+            return context.proceed();
+        }
+        final Map<String, Object> data = context.getContextData();
+        final String id = String.valueOf(data.get(IdGeneratorInterceptor.class.getName()));
+        if (model.useThreads) {
+            data.put(BulkheadInterceptor.class.getName() + ".self_" + id, this);
+            data.put(BulkheadInterceptor.class.getName() + ".model_" + id, model);
+            data.put(BulkheadInterceptor.class.getName() + "_" + id, model.pool);
+            data.put(Asynchronous.class.getName() + ".skip_" + id, Boolean.TRUE);
+            return around(context, id);
+        }
+
+        final BulkheadHandler handler = new ReleaseHandler(model);
+        data.put(BulkheadHandler.class.getName() + "_" + id, handler);
+        handler.acquire();
+        try {
+            return context.proceed();
+        } finally {
+            handler.release();
+            data.remove(BulkheadHandler.class.getName() + "_" + id);
+        }
+    }
+
+    private Model getModel(final InvocationContext context, final Map<Key, Model> models, final Key key) {
         Model model = models.get(key);
         if (model == null) {
             model = cache.create(context);
@@ -72,32 +98,7 @@
                 cache.postCreate(model, context);
             }
         }
-        if (model.disabled) {
-            return context.proceed();
-        }
-        if (model.useThreads) {
-            final Map<String, Object> data = context.getContextData();
-            final Object id = data.get(IdGeneratorInterceptor.class.getName());
-            data.put(BulkheadInterceptor.class.getName() + ".model_" + id, model);
-            data.put(BulkheadInterceptor.class.getName() + "_" + id, model.pool);
-            data.put(Asynchronous.class.getName() + ".skip_" + id, Boolean.TRUE);
-            return around(context);
-        } else {
-            if (!model.semaphore.tryAcquire()) {
-                model.callsRejected.inc();
-                throw new BulkheadException("No more permission available");
-            }
-            model.callsAccepted.inc();
-            model.concurrentCalls.incrementAndGet();
-            final long start = System.nanoTime();
-            try {
-                return context.proceed();
-            } finally {
-                model.executionDuration.update(System.nanoTime() - start);
-                model.semaphore.release();
-                model.concurrentCalls.decrementAndGet();
-            }
-        }
+        return model;
     }
 
     private Model getModel(final InvocationContext context) {
@@ -117,7 +118,7 @@
     }
 
     @Override
-    protected Executor getExecutor(final InvocationContext context) {
+    public Executor getExecutor(final InvocationContext context) {
         return Executor.class.cast(context.getContextData()
                   .get(BulkheadInterceptor.class.getName() + "_" + context.getContextData().get(IdGeneratorInterceptor.class.getName())));
     }
@@ -212,20 +213,28 @@
                     }
                 }, (r, executor) -> {
                     callsRejected.inc();
-                    throw new BulkheadException("Can't accept task " + r);
+
+                    final BulkheadException bulkheadException = new BulkheadException("Can't accept task " + r);
+                    if (BulkheadAsyncTask.class.isInstance(r)) { // normally yes
+                        final BulkheadAsyncTask bulkheadAsyncTask = BulkheadAsyncTask.class.cast(r);
+                        if (AsyncTask.class.isInstance(bulkheadAsyncTask.command)) { // todo: handle a way to always unwrap
+                            final AsyncTask asyncTask = AsyncTask.class.cast(bulkheadAsyncTask.command);
+                            final FutureWrapper<Object> facade = asyncTask.getFacade();
+                            facade.failed(bulkheadException);
+                            if (facade.getParent() != null) {
+                                facade.getParent().failed(bulkheadException);
+                            }
+                            return; // handled
+                        }
+                    }
+
+                    // else just throw it (sync case)
+                    throw bulkheadException;
                 }) {
                     @Override
                     public void execute(final Runnable command) {
                         final long submitted = System.nanoTime();
-                        super.execute(() -> {
-                            final long start = System.nanoTime();
-                            waitingDuration.update(start - submitted);
-                            try {
-                                command.run();
-                            } finally {
-                                executionDuration.update(System.nanoTime() - start);
-                            }
-                        });
+                        super.execute(new BulkheadAsyncTask(waitingDuration, command, submitted));
                         callsAccepted.inc();
                     }
                 };
@@ -238,6 +247,30 @@
         }
     }
 
+    private static class BulkheadAsyncTask implements Runnable {
+        private final FaultToleranceMetrics.Histogram histogram;
+        private final Runnable command;
+        private final long submitted;
+
+        private BulkheadAsyncTask(final FaultToleranceMetrics.Histogram histogram,
+                                  final Runnable command, final long submitted) {
+            this.histogram = histogram;
+            this.command = command;
+            this.submitted = submitted;
+        }
+
+        @Override
+        public void run() {
+            final long start = System.nanoTime();
+            histogram.update(start - submitted);
+            try {
+                command.run();
+            } finally {
+                histogram.update(System.nanoTime() - start);
+            }
+        }
+    }
+
     @ApplicationScoped
     public static class Cache {
         private final Map<Key, Model> models = new ConcurrentHashMap<>();
@@ -306,4 +339,36 @@
             }
         }
     }
+
+    private static class ReleaseHandler implements BulkheadHandler {
+        private final Model model;
+        private final AtomicBoolean released = new AtomicBoolean();
+        private long start;
+
+        public ReleaseHandler(final Model model) {
+            this.model = model;
+        }
+
+        @Override
+        public void acquire() {
+            if (!model.semaphore.tryAcquire()) {
+                model.callsRejected.inc();
+                throw new BulkheadException("No more permission available");
+            }
+            model.callsAccepted.inc();
+            model.concurrentCalls.incrementAndGet();
+            start = System.nanoTime();
+            released.set(false);
+        }
+
+        @Override
+        public void release() {
+            if (!released.compareAndSet(false, true)) {
+                return;
+            }
+            model.executionDuration.update(System.nanoTime() - start);
+            model.semaphore.release();
+            model.concurrentCalls.decrementAndGet();
+        }
+    }
 }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
index c3a85f9..013791b 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
@@ -77,6 +77,7 @@
 import org.eclipse.microprofile.faulttolerance.Fallback;
 import org.eclipse.microprofile.faulttolerance.Retry;
 import org.eclipse.microprofile.faulttolerance.Timeout;
+import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
 
 public class SafeguardExtension implements Extension {
     private boolean foundExecutor;
@@ -284,10 +285,12 @@
         final AnnotatedType<?> annotatedType = AnnotatedType.class.cast(type);
         try {
             annotatedType.getMethods().stream()
-                         .filter(it -> classHasMarker || it.isAnnotationPresent(marker))
-                         .map(m -> new MockInvocationContext(m.getJavaMember()))
-                         .forEach(contextConsumer);
+                    .filter(it -> classHasMarker || it.isAnnotationPresent(marker))
+                    .map(m -> new MockInvocationContext(m.getJavaMember()))
+                    .forEach(contextConsumer);
             return null;
+        } catch (final FaultToleranceDefinitionException ftde) {
+            throw ftde;
         } catch (final RuntimeException re) {
             return new DefinitionException(re);
         }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
index 3b5b5e9..21f75a1 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
@@ -180,15 +180,10 @@
                 handler = fallbackHandler;
             } else {
                 try {
-                    final Method fallbackMethod = ofNullable(context.getTarget())
-                            .map(Object::getClass)
-                            .orElseGet(() -> Class.class.cast(context.getMethod().getDeclaringClass()))
-                            .getMethod(method, context.getMethod().getParameterTypes());
-                    if (!extension.toClass(context.getMethod()
-                                                  .getReturnType())
-                                  .isAssignableFrom(extension.toClass(fallbackMethod.getReturnType())) || !Arrays.equals(
-                            context.getMethod()
-                                   .getParameterTypes(), fallbackMethod.getParameterTypes())) {
+                    final Method fallbackMethod = getFallbackMethod(context, method);
+                    if (!extension.toClass(context.getMethod().getReturnType())
+                                .isAssignableFrom(extension.toClass(fallbackMethod.getReturnType())) ||
+                            !Arrays.equals(context.getMethod().getParameterTypes(), fallbackMethod.getParameterTypes())) {
                         throw new FaultToleranceDefinitionException("handler method does not match method: " + context.getMethod());
                     }
                     if (!fallbackMethod.isAccessible()) {
@@ -225,6 +220,30 @@
                 return handler.handle(context12);
             };
         }
+
+        private Method getFallbackMethod(final InvocationContext context, final String method) throws NoSuchMethodException {
+            final Class<?> rootClass = ofNullable(context.getTarget())
+                    .map(Object::getClass)
+                    .orElseGet(() -> Class.class.cast(context.getMethod().getDeclaringClass()));
+            Class<?> current = rootClass;
+            while (current != null) {
+                try {
+                    return current.getDeclaredMethod(method, context.getMethod().getParameterTypes());
+                } catch (final NoSuchMethodException nsme) {
+                    current = current.getSuperclass();
+                }
+            }
+            return Stream.of(rootClass.getInterfaces())
+                    .flatMap(c -> {
+                        try {
+                            return Stream.of(c.getDeclaredMethod(method, context.getMethod().getParameterTypes()));
+                        } catch (final NoSuchMethodException nsme) {
+                            return Stream.empty();
+                        }
+                    })
+                    .findFirst()
+                    .orElseThrow(() -> new NoSuchMethodException(method + " as fallback for " + context.getMethod()));
+        }
     }
 
     private interface EnrichedExecutionContext extends ExecutionContext {
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
index d2c476d..675b4a5 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/AfterRetryInterceptor.java
@@ -31,6 +31,11 @@
 @Priority(Interceptor.Priority.PLATFORM_AFTER + 10)
 public class AfterRetryInterceptor extends BaseRetryInterceptor {
     @Override
+    protected boolean suspendBulkhead() {
+        return true;
+    }
+
+    @Override
     protected void executeFinalCounterAction(final Map<String, Object> contextData,
                                              final FaultToleranceMetrics.Counter counter) {
         // can be used to push it back to the before interceptor:
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
index 61c7ba8..69ed1f4 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
@@ -24,10 +24,13 @@
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Stream;
 
 import javax.enterprise.context.ApplicationScoped;
@@ -37,11 +40,14 @@
 
 import org.apache.safeguard.impl.annotation.AnnotationFinder;
 import org.apache.safeguard.impl.asynchronous.BaseAsynchronousInterceptor;
+import org.apache.safeguard.impl.bulkhead.BulkheadHandler;
+import org.apache.safeguard.impl.bulkhead.BulkheadInterceptor;
 import org.apache.safeguard.impl.cache.Key;
 import org.apache.safeguard.impl.cache.UnwrappedCache;
 import org.apache.safeguard.impl.config.ConfigurationMapper;
 import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
 import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
+import org.eclipse.microprofile.faulttolerance.Asynchronous;
 import org.eclipse.microprofile.faulttolerance.Retry;
 import org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException;
 import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
@@ -56,6 +62,134 @@
     public Object retry(final InvocationContext context) throws Exception {
         final Map<Key, Model> models = cache.getModels();
         final Key cacheKey = new Key(context, cache.getUnwrapped());
+        final Model model = getModel(context, models, cacheKey);
+        if (model.disabled) {
+            return context.proceed();
+        }
+        final Map<String, Object> contextData = context.getContextData();
+        final String id = String.valueOf(contextData.get(IdGeneratorInterceptor.class.getName()));
+        final String contextKey = BaseRetryInterceptor.class.getName() + ".context_" + id;
+        Context retryContext = Context.class.cast(contextData.get(contextKey));
+        if (retryContext == null) {
+            retryContext = new Context(System.nanoTime() + model.maxDuration, model.maxRetries);
+            contextData.put(contextKey, retryContext);
+        }
+
+        // todo: make it more elegant, maybe an event
+        final BulkheadHandler bulkheadHandler = suspendBulkhead() ?
+                getSyncBulkheadHandler(contextData, id) : null;
+        final BulkheadInterceptor bulkheadInterceptor = suspendBulkhead() ?
+                getThreadedBulkHeadInterceptor(contextData, id) : null;
+
+        final AtomicBoolean mustAcquireBulkheadPerm = new AtomicBoolean(false);
+        while (retryContext.counter >= 0) {
+            try {
+                return retryIteration(context, model, id, contextKey, retryContext,
+                        bulkheadInterceptor, bulkheadHandler, mustAcquireBulkheadPerm);
+            } catch (final Exception re) {
+                try {
+                    retryContext = handleException(contextData, contextKey, model, re, bulkheadHandler);
+                } catch (final Exception e) {
+                    if (isAsync(context)) {
+                        final CompletableFuture<Object> future = new BaseAsynchronousInterceptor.ExtendedCompletableFuture<>();
+                        future.completeExceptionally(re);
+                        // avoid to call it yet another time
+                        contextData.remove(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" + id);
+                        return future;
+                    }
+                    throw e;
+                }
+                mustAcquireBulkheadPerm.set(bulkheadHandler != null);
+            }
+        }
+        throw new FaultToleranceException("Inaccessible normally, here for compilation");
+    }
+
+    private BulkheadHandler getSyncBulkheadHandler(final Map<String, Object> contextData, final Object id) {
+        return BulkheadHandler.class.cast(contextData.get(BulkheadHandler.class.getName() + "_" + id));
+    }
+
+    private BulkheadInterceptor getThreadedBulkHeadInterceptor(final Map<String, Object> contextData, final Object id) {
+        return BulkheadInterceptor.class.cast(contextData.get(BulkheadInterceptor.class.getName() + ".self_" + id));
+    }
+
+    private boolean isAsync(final InvocationContext context) {
+        final Map<String, Object> contextData = context.getContextData();
+        return contextData.get(Asynchronous.class.getName() + ".skip_" +
+                contextData.get(IdGeneratorInterceptor.class.getName())) != null;
+    }
+
+    private Object retryIteration(final InvocationContext context, final Model model,
+                                  final String id, final String contextKey,
+                                  final Context retryContext,
+                                  final BulkheadInterceptor bulkheadInterceptor,
+                                  final BulkheadHandler bulkheadHandler,
+                                  final AtomicBoolean mustAcquireBulkheadPerm) throws Exception {
+        if (mustAcquireBulkheadPerm.get()) {
+            bulkheadHandler.acquire();
+        }
+        final Object proceed = bulkheadInterceptor != null ?
+                resubmitThroughBulkhead(context, bulkheadInterceptor, id) : context.proceed();
+        final Map<String, Object> contextData = context.getContextData();
+        if (retryContext.counter == model.maxRetries) {
+            executeFinalCounterAction(contextData, model.callsSucceededNotRetried);
+        } else {
+            executeFinalCounterAction(contextData, model.callsSucceededRetried);
+        }
+        if (BaseAsynchronousInterceptor.BaseFuture.class.isInstance(proceed)) {
+            final Model modelRef = model;
+            final String key = BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" + id;
+            contextData.put(
+                    key,
+                    (BaseAsynchronousInterceptor.ErrorHandler<Exception, Future<?>>) error -> {
+                        // todo: in async mode we shouldnt wait with "sleep" there - either a SES or a CS/CF
+                        handleException(contextData, contextKey, modelRef, error, bulkheadHandler);
+                        mustAcquireBulkheadPerm.set(bulkheadHandler != null);
+                        final Context ctx = Context.class.cast(contextData.get(contextKey));
+                        if (ctx != null && ctx.counter >= 0) {
+                            return Future.class.cast(retryIteration(
+                                    context, model, id, contextKey, retryContext,
+                                    bulkheadInterceptor, bulkheadHandler, mustAcquireBulkheadPerm));
+                        }
+
+                        final BaseAsynchronousInterceptor.ExtendedCompletableFuture<Object> future = new BaseAsynchronousInterceptor.ExtendedCompletableFuture<>();
+                        future.completeExceptionally(error);
+                        contextData.remove(key);
+                        return future;
+                    });
+            if (CompletionStage.class.isInstance(proceed) && mustAcquireBulkheadPerm.get()) {
+                return CompletionStage.class.cast(proceed)
+                        .whenComplete((r, e) -> bulkheadHandler.release());
+            }
+        }
+        if (mustAcquireBulkheadPerm.get()) {
+            bulkheadHandler.release();
+        }
+        return proceed;
+    }
+
+    private Object resubmitThroughBulkhead(final InvocationContext context,
+                                           final BulkheadInterceptor bulkheadInterceptor,
+                                           final String id) throws Exception {
+        if (CompletionStage.class.isAssignableFrom(context.getMethod().getReturnType())) {
+            return bulkheadInterceptor.bulkhead(context);
+        }
+        final Map<String, Object> data = context.getContextData();
+        final Object pool = data.get(BulkheadInterceptor.class.getName() + "_" + data.get(IdGeneratorInterceptor.class.getName()));
+        if (pool != null) {
+            final BaseAsynchronousInterceptor.FutureWrapper<?> parent = BaseAsynchronousInterceptor.FutureWrapper.class.cast(data.get(BaseAsynchronousInterceptor.FutureWrapper.class.getName() + "_" + id));
+            final BaseAsynchronousInterceptor.FutureWrapper wrapper = new BaseAsynchronousInterceptor.FutureWrapper<>(data);
+            if (parent != null) {
+                wrapper.setParent(parent);
+            }
+            final BaseAsynchronousInterceptor.AsyncTask task = new BaseAsynchronousInterceptor.AsyncTask(context, wrapper);
+            bulkheadInterceptor.getExecutor(context).execute(task);
+            return wrapper;
+        }
+        return bulkheadInterceptor.bulkhead(context);
+    }
+
+    private Model getModel(final InvocationContext context, final Map<Key, Model> models, final Key cacheKey) {
         Model model = models.get(cacheKey);
         if (model == null) {
             model = cache.create(context);
@@ -64,44 +198,18 @@
                 model = existing;
             }
         }
-        if (model.disabled) {
-            return context.proceed();
-        }
-        final Map<String, Object> contextData = context.getContextData();
-        final Object id = contextData.get(IdGeneratorInterceptor.class.getName());
-        final String contextKey = BaseRetryInterceptor.class.getName() + ".context_" + id;
-        Context retryContext = Context.class.cast(contextData.get(contextKey));
-        if (retryContext == null) {
-            retryContext = new Context(System.nanoTime() + model.maxDuration, model.maxRetries);
-            contextData.put(contextKey, retryContext);
-        }
-
-        while (retryContext.counter >= 0) { // todo: handle async if result is a Future or CompletionStage (weird no?)
-            try {
-                final Object proceed = context.proceed();
-                if (retryContext.counter == model.maxRetries) {
-                    executeFinalCounterAction(contextData, model.callsSucceededNotRetried);
-                } else {
-                    executeFinalCounterAction(contextData, model.callsSucceededRetried);
-                }
-                if (BaseAsynchronousInterceptor.BaseFuture.class.isInstance(proceed)) {
-                    final Model modelRef = model;
-                    contextData.put(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" + id,
-                        (BaseAsynchronousInterceptor.ErrorHandler<Exception, Future<?>>) error -> {
-                            handleException(contextData, contextKey, modelRef, error);
-                            return Future.class.cast(context.proceed());
-                        });
-                }
-                return proceed;
-            } catch (final Exception re) {
-                retryContext = handleException(contextData, contextKey, model, re);
-            }
-        }
-        throw new FaultToleranceException("Inaccessible normally, here for compilation");
+        return model;
     }
 
+    protected abstract boolean suspendBulkhead();
+
     private Context handleException(final Map<String, Object> contextData, final String contextKey,
-                                    final Model modelRef, final Exception error) throws Exception {
+                                    final Model modelRef, final Exception error, final BulkheadHandler bulkheadHandler) throws Exception {
+        // if we retry, we release bulkhead to avoid to hold it while we pause
+        if (bulkheadHandler != null) {
+            bulkheadHandler.release();
+        }
+
         if (CircuitBreakerOpenException.class.isInstance(error)) {
             throw error;
         }
@@ -109,13 +217,14 @@
         // refresh the counter from the other interceptors
         final Context ctx = Context.class.cast(contextData.get(contextKey));
 
-        if (modelRef.abortOn(error) || (--ctx.counter) < 0 || System.nanoTime() >= ctx.maxEnd) {
+        if (modelRef.abortOn(error) || ctx.decr() < 0 || System.nanoTime() >= ctx.maxEnd) {
             executeFinalCounterAction(contextData, modelRef.callsFailed);
             throw error;
         }
         if (!modelRef.retryOn(error)) {
             throw error;
         }
+
         modelRef.retries.inc();
         final long pause = modelRef.nextPause();
         if (pause > 0) {
@@ -157,7 +266,7 @@
             this.disabled = disabled;
             this.abortOn = retry.abortOn();
             this.retryOn = retry.retryOn();
-            this.maxDuration = retry.delayUnit().getDuration().toNanos() * retry.maxDuration();
+            this.maxDuration = retry.durationUnit().getDuration().toNanos() * retry.maxDuration();
             this.maxRetries = retry.maxRetries();
             this.delay = retry.delayUnit().getDuration().toNanos() * retry.delay();
             this.jitter = retry.jitterDelayUnit().getDuration().toNanos() * retry.jitter();
@@ -197,8 +306,9 @@
 
         private long nextPause() {
             final ThreadLocalRandom random = ThreadLocalRandom.current();
-            return TimeUnit.NANOSECONDS
-                    .toMillis(min(maxDuration, max(0, ((random.nextBoolean() ? 1 : -1) * delay) + (jitter == 0 ? 0 : random.nextLong(jitter)))));
+            final long nextDelay = (random.nextBoolean() ? 1 : -1) * delay;
+            final long nextJitter = jitter == 0 ? 0 : random.nextLong(jitter);
+            return TimeUnit.NANOSECONDS.toMillis(min(maxDuration, max(0, nextDelay + nextJitter)));
         }
     }
 
@@ -253,5 +363,9 @@
             this.maxEnd = maxEnd;
             this.counter = maxRetries;
         }
+
+        private synchronized int decr() {
+            return --counter;
+        }
     }
 }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
index 06f7de6..90f889d 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BeforeRetryInterceptor.java
@@ -31,6 +31,11 @@
 @Priority(Interceptor.Priority.PLATFORM_AFTER)
 public class BeforeRetryInterceptor extends BaseRetryInterceptor {
     @Override
+    protected boolean suspendBulkhead() {
+        return false;
+    }
+
+    @Override
     protected void executeFinalCounterAction(final Map<String, Object> contextData,
                                              final FaultToleranceMetrics.Counter counter) {
         counter.inc();
diff --git a/safeguard-impl/src/test/resources/dev.xml b/safeguard-impl/src/test/resources/dev.xml
index bfbcf47..ff82e8d 100644
--- a/safeguard-impl/src/test/resources/dev.xml
+++ b/safeguard-impl/src/test/resources/dev.xml
@@ -19,11 +19,19 @@
 <suite name="Dev TCK Run" verbose="2" configfailurepolicy="continue" >
   <test name="TCK Run">
     <!-- all TCK -->
+    <!--
     <packages>
       <package name="org.eclipse.microprofile.fault.tolerance.tck.*" />
     </packages>
+    -->
     <!--
     to debug a single test (class or method), comment packages and add <classes>
     -->
+    <classes>
+      <class name="org.eclipse.microprofile.fault.tolerance.tck.bulkhead.BulkheadAsynchRetryTest">
+        <methods>
+        </methods>
+      </class>
+    </classes>
   </test>
 </suite>