more mp config handling + metrics support + enhancing of future support
diff --git a/README.md b/README.md
index 97588dd..0bd02c4 100644
--- a/README.md
+++ b/README.md
@@ -36,13 +36,8 @@
 ```xml
 <dependencies>
     <dependency>
-        <artifactId>safeguard-api</artifactId>
         <groupId>org.apache.geronimo.safeguard</groupId>
-        <version>1.0</version>
-    </dependency>
-    <dependency>
-        <artifactId>safeguard-impl</artifactId>
-        <groupId>org.apache.geronimo.safeguard</groupId>
+        **<artifactId>safeguard-impl</artifactId>**
         <version>1.0</version>
     </dependency>
 </dependencies>
@@ -52,27 +47,15 @@
 
 ### Integration
 
-The core of Safeguard is wrapped around an `ExecutionManager` which takes care of coordinating and storing the execution state of various methods.  It allows some configurability, but if you want to change it your best solution is to create an alternative of `ExecutionManager` with your customizations.  For instance, in an EE environment you may want to use a `ManagedScheduledExecutorService` which could be done:
+For `@Asynchronous` executor customization you can use:
 
 ```java
 @ApplicationScoped
-@Specializes
-@Priority(100)
-public class MyExecutionManagerProvider extends FailsafeExecutionManagerProvider{
+public class MyExecutionManagerProvider {
     @Resource
-    private ManagedScheduledExecutorService executorService;
     @Produces
-    @ApplicationScoped
-    public ExecutionManager createExecutionManager() {
-        FailsafeCircuitBreakerManager circuitBreakerManager = new FailsafeCircuitBreakerManager();
-        FailsafeRetryManager retryManager = new FailsafeRetryManager();
-        BulkheadManagerImpl bulkheadManager = new BulkheadManagerImpl();
-        DefaultExecutorServiceProvider executorServiceProvider = new DefaultExecutorServiceProvider(executorService);
-        ExecutionPlanFactory executionPlanFactory = new ExecutionPlanFactory(circuitBreakerManager, retryManager, bulkheadManager, mapper,
-                executorServiceProvider);
-        return FailsafeExecutionManager(MicroprofileAnnotationMapper.getInstance(), bulkheadManager, circuitBreakerManager, 
-                retryManager, executionPlanFactory, executorServiceProvider);
-    }
+    @Safeguard
+    private ManagedScheduledExecutorService executor;
 }
 
 
diff --git a/pom.xml b/pom.xml
index 912b7c6..f1ea78c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
     <maven.compiler.source>1.8</maven.compiler.source>
     <maven.compiler.target>1.8</maven.compiler.target>
     <microprofile-fault-tolerance.version>1.1.3</microprofile-fault-tolerance.version>
-    <owb.version>2.0.8</owb.version>
+    <owb.version>2.0.9-SNAPSHOT</owb.version>
     <arquillian.version>1.1.14.Final</arquillian.version>
     <arquillian-weld-embedded.version>2.0.0.Final</arquillian-weld-embedded.version>
     <cdi2-api.version>2.0</cdi2-api.version>
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/annotation/AnnotationFinder.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/annotation/AnnotationFinder.java
index 9fe2f28..6fab634 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/annotation/AnnotationFinder.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/annotation/AnnotationFinder.java
@@ -34,7 +34,11 @@
     private BeanManager manager;
 
     public <T extends Annotation> T findAnnotation(final Class<T> type, final InvocationContext context) {
-        Class<?> target = context.getTarget().getClass();
+        // normally we should use target but validations require the fallback
+        final Class<?> targetClass = ofNullable(context.getTarget())
+                .map(Object::getClass)
+                .orElseGet(() -> Class.class.cast(type));
+        Class<?> target = targetClass;
         while (target.getName().contains("$$")) {
             target = target.getSuperclass();
         }
@@ -44,6 +48,6 @@
                 .findFirst()
                 .map(m -> m.getAnnotation(type))
                 .orElseGet(() -> ofNullable(context.getMethod().getAnnotation(type))
-                        .orElseGet(() -> context.getTarget().getClass().getAnnotation(type)));
+                        .orElseGet(() -> targetClass.getAnnotation(type)));
     }
 }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
index fa375f2..d2ae29f 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/AsynchronousInterceptor.java
@@ -18,32 +18,75 @@
  */
 package org.apache.safeguard.impl.asynchronous;
 
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 import javax.annotation.Priority;
+import javax.enterprise.context.ApplicationScoped;
 import javax.inject.Inject;
 import javax.interceptor.AroundInvoke;
 import javax.interceptor.Interceptor;
 import javax.interceptor.InvocationContext;
 
+import org.apache.safeguard.impl.config.ConfigurationMapper;
 import org.apache.safeguard.impl.customizable.Safeguard;
+import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
 import org.eclipse.microprofile.faulttolerance.Asynchronous;
 
 @Interceptor
 @Asynchronous
-@Priority(Interceptor.Priority.PLATFORM_AFTER + 1)
+@Priority(Interceptor.Priority.PLATFORM_AFTER + 6)
 public class AsynchronousInterceptor extends BaseAsynchronousInterceptor {
     @Inject
-    @Safeguard
-    private Executor executor;
+    private Cache cache;
 
     @Override
     protected Executor getExecutor(final InvocationContext context) {
-        return executor;
+        return cache.getExecutor();
     }
 
     @AroundInvoke
     public Object async(final InvocationContext context) throws Exception {
+        final Map<Method, Boolean> models = cache.getEnabled();
+        Boolean enabled = models.get(context.getMethod());
+        if (enabled == null) {
+            enabled = cache.getMapper().isEnabled(context.getMethod(), Asynchronous.class);
+            models.putIfAbsent(context.getMethod(), enabled);
+        }
+        if (!enabled) {
+            return context.proceed();
+        }
+        final String key = Asynchronous.class.getName() + ".skip_" +
+                context.getContextData().get(IdGeneratorInterceptor.class.getName());
+        if (context.getContextData().putIfAbsent(key, Boolean.TRUE) != null) { // bulkhead or so handling threading
+            return context.proceed();
+        }
         return around(context);
     }
+
+    @ApplicationScoped
+    public static class Cache {
+        private final Map<Method, Boolean> enabled = new ConcurrentHashMap<>();
+
+        @Inject
+        @Safeguard
+        private Executor executor;
+
+        @Inject
+        private ConfigurationMapper mapper;
+
+        public ConfigurationMapper getMapper() {
+            return mapper;
+        }
+
+        public Executor getExecutor() {
+            return executor;
+        }
+
+        public Map<Method, Boolean> getEnabled() {
+            return enabled;
+        }
+    }
 }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
index 52cb343..9626855 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/asynchronous/BaseAsynchronousInterceptor.java
@@ -18,9 +18,11 @@
  */
 package org.apache.safeguard.impl.asynchronous;
 
+import static java.util.Optional.ofNullable;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 import java.io.Serializable;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.CountDownLatch;
@@ -35,21 +37,12 @@
 import javax.interceptor.InvocationContext;
 
 import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
-import org.eclipse.microprofile.faulttolerance.Asynchronous;
 import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;
 
 public abstract class BaseAsynchronousInterceptor implements Serializable {
     protected abstract Executor getExecutor(InvocationContext context);
 
-    protected Object around(final InvocationContext context) throws Exception {
-        final String key = Asynchronous.class.getName() +
-                context.getContextData().get(IdGeneratorInterceptor.class.getName());
-        if (context.getContextData().get(key) != null) { // bulkhead or so handling threading
-            return context.proceed();
-        }
-
-        context.getContextData().put(key, "true");
-
+    protected Object around(final InvocationContext context) {
         final Class<?> returnType = context.getMethod().getReturnType();
         if (CompletionStage.class.isAssignableFrom(returnType)) {
             final ExtendedCompletableFuture<Object> future = newCompletableFuture(context);
@@ -61,7 +54,20 @@
                     stage.handle((r, e) -> {
                         future.after();
                         if (e != null) {
-                            future.completeExceptionally(e);
+                            ofNullable(getErrorHandler(context.getContextData()))
+                                .map(eh -> {
+                                    if (Exception.class.isInstance(e)) {
+                                        try {
+                                            eh.apply(Exception.class.cast(e));
+                                        } catch (final Exception e1) {
+                                            future.completeExceptionally(e1);
+                                        }
+                                    } else {
+                                        future.completeExceptionally(e);
+                                    }
+                                    return true;
+                                })
+                                .orElseGet(() -> future.completeExceptionally(e));
                         } else {
                             future.complete(r);
                         }
@@ -74,7 +80,7 @@
             return future;
         }
         if (Future.class.isAssignableFrom(returnType)) {
-            final FutureWrapper<Object> facade = newFuture(context);
+            final FutureWrapper<Object> facade = newFuture(context, context.getContextData());
             getExecutor(context).execute(() -> {
                 final Object proceed;
                 try {
@@ -94,31 +100,47 @@
                         "Should be Future or CompletionStage.");
     }
 
-    protected FutureWrapper<Object> newFuture(final InvocationContext context) {
-        return new FutureWrapper<>();
+    private static ErrorHandler<Exception, Future<?>> getErrorHandler(final Map<String, Object> contextData) {
+        return ErrorHandler.class.cast(
+                contextData.get(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" +
+                contextData.get(IdGeneratorInterceptor.class.getName())));
+    }
+
+    protected FutureWrapper<Object> newFuture(final InvocationContext context,
+                                              final Map<String, Object> data) {
+        return new FutureWrapper<>(data);
     }
 
     protected ExtendedCompletableFuture<Object> newCompletableFuture(final InvocationContext context) {
         return new ExtendedCompletableFuture<>();
     }
 
-    public static class ExtendedCompletableFuture<T> extends CompletableFuture<T> {
-        public void before() {
-            // no-op
+    @FunctionalInterface
+    public interface ErrorHandler<A, B>  {
+        B apply(A a) throws Exception;
+    }
+
+    public interface BaseFuture {
+        default void before() {
+
         }
 
-        public void after() {
-            // no-op
+        default void after() {
+
         }
     }
 
-    public static class FutureWrapper<T> implements Future<T> {
+    public static class ExtendedCompletableFuture<T> extends CompletableFuture<T> implements BaseFuture {
+    }
+
+    public static class FutureWrapper<T> implements Future<T>, BaseFuture {
         private final AtomicReference<Future<T>> delegate = new AtomicReference<>();
         private final AtomicReference<Consumer<Future<T>>> cancelled = new AtomicReference<>();
         private final CountDownLatch latch = new CountDownLatch(1);
+        private final Map<String, Object> data;
 
-        public void before() {
-            // no-op
+        public FutureWrapper(final Map<String, Object> data) {
+            this.data = data;
         }
 
         public void setDelegate(final Future<T> delegate) {
@@ -126,6 +148,7 @@
             if (cancelledTask != null) {
                 cancelledTask.accept(delegate);
             }
+            after();
             this.delegate.set(delegate);
             this.latch.countDown();
         }
@@ -150,7 +173,14 @@
         @Override
         public T get() throws InterruptedException, ExecutionException {
             latch.await();
-            return delegate.get().get();
+            final Future<T> future = delegate.get();
+            try {
+                return future.get();
+            } catch (final ExecutionException ee) {
+                final Future<T> newFuture = onException(ee);
+                delegate.set(newFuture);
+                return newFuture.get();
+            }
         }
 
         @Override
@@ -161,7 +191,50 @@
             if (!latchWait) {
                 throw new TimeoutException();
             }
-            return delegate.get().get(unit.toNanos(timeout) - latchWaitDuration, NANOSECONDS);
+            try {
+                return delegate.get().get(unit.toNanos(timeout) - latchWaitDuration, NANOSECONDS);
+            } catch (final ExecutionException ee) {
+                delegate.set(onException(ee));
+                final long duration = unit.toNanos(timeout) - (System.nanoTime() - latchWaitDuration);
+                if (duration < 0) {
+                    throw new TimeoutException();
+                }
+                return delegate.get().get(duration, NANOSECONDS);
+            }
+        }
+
+        protected Future<T> onException(final Throwable cause) throws ExecutionException {
+            if (!Exception.class.isInstance(cause)) {
+                if (Error.class.isInstance(cause)) {
+                    throw Error.class.cast(cause);
+                }
+                throw new IllegalStateException(cause);
+            }
+            final Exception ex = Exception.class.cast(cause);
+            final ErrorHandler<Exception, Future<?>> handler = getErrorHandler(data);
+            if (handler != null) {
+                try {
+                    return (Future<T>) handler.apply(ex);
+                } catch (final Exception e) {
+                    if (ExecutionException.class.isInstance(e)) {
+                        throw ExecutionException.class.cast(e);
+                    }
+                    if (RuntimeException.class.isInstance(e)) {
+                        throw RuntimeException.class.cast(e);
+                    }
+                    if (Error.class.isInstance(e)) {
+                        throw Error.class.cast(e);
+                    }
+                    throw new IllegalStateException(e);
+                }
+            }
+            if (ExecutionException.class.isInstance(cause)) {
+                throw ExecutionException.class.cast(cause);
+            }
+            if (RuntimeException.class.isInstance(cause)) {
+                throw RuntimeException.class.cast(cause);
+            }
+            throw new IllegalStateException(cause); // unreachable - just for compiler
         }
     }
 }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
index f2998b6..c779af1 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/bulkhead/BulkheadInterceptor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.safeguard.impl.bulkhead;
 
+import static java.util.Optional.ofNullable;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 import java.lang.reflect.Method;
@@ -25,8 +26,8 @@
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -40,6 +41,7 @@
 
 import org.apache.safeguard.impl.annotation.AnnotationFinder;
 import org.apache.safeguard.impl.asynchronous.BaseAsynchronousInterceptor;
+import org.apache.safeguard.impl.config.ConfigurationMapper;
 import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
 import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
 import org.eclipse.microprofile.faulttolerance.Asynchronous;
@@ -63,12 +65,19 @@
             final Model existing = models.putIfAbsent(context.getMethod(), model);
             if (existing != null) {
                 model = existing;
+            } else {
+                cache.postCreate(model, context);
             }
         }
+        if (model.disabled) {
+            return context.proceed();
+        }
         if (model.useThreads) {
-            final Object id = context.getContextData().get(IdGeneratorInterceptor.class.getName());
-            context.getContextData().put(BulkheadInterceptor.class.getName() + ".model_" + id, model.pool);
-            context.getContextData().put(BulkheadInterceptor.class.getName() + "_" + id, model.pool);
+            final Map<String, Object> data = context.getContextData();
+            final Object id = data.get(IdGeneratorInterceptor.class.getName());
+            data.put(BulkheadInterceptor.class.getName() + ".model_" + id, model);
+            data.put(BulkheadInterceptor.class.getName() + "_" + id, model.pool);
+            data.put(Asynchronous.class.getName() + ".skip_" + id, Boolean.TRUE);
             return around(context);
         } else {
             if (!model.semaphore.tryAcquire()) {
@@ -94,10 +103,12 @@
                 context.getContextData().get(IdGeneratorInterceptor.class.getName())));
     }
 
-    protected FutureWrapper<Object> newFuture(final InvocationContext context) {
-        return new ContextualFutureWrapper<>(getModel(context));
+    @Override
+    protected FutureWrapper<Object> newFuture(final InvocationContext context, final Map<String, Object> data) {
+        return new ContextualFutureWrapper<>(getModel(context), context.getContextData());
     }
 
+    @Override
     protected ExtendedCompletableFuture<Object> newCompletableFuture(final InvocationContext context) {
         return new ContextualCompletableFuture<>(getModel(context));
     }
@@ -105,7 +116,7 @@
     @Override
     protected Executor getExecutor(final InvocationContext context) {
         return Executor.class.cast(context.getContextData()
-                  .get(BulkheadInterceptor.class.getName() + context.getContextData().get(IdGeneratorInterceptor.class.getName())));
+                  .get(BulkheadInterceptor.class.getName() + "_" + context.getContextData().get(IdGeneratorInterceptor.class.getName())));
     }
 
     private static class ContextualCompletableFuture<T> extends ExtendedCompletableFuture<T> {
@@ -129,7 +140,8 @@
     private static class ContextualFutureWrapper<T> extends FutureWrapper<T> {
         private final Model model;
 
-        private ContextualFutureWrapper(final Model model) {
+        private ContextualFutureWrapper(final Model model, final Map<String, Object> data) {
+            super(data);
             this.model = model;
         }
 
@@ -139,13 +151,13 @@
         }
 
         @Override
-        public void setDelegate(final Future<T> delegate) {
+        public void after() {
             model.concurrentCalls.decrementAndGet();
-            super.setDelegate(delegate);
         }
     }
 
     static class Model {
+        private final boolean disabled;
         private final int value;
         private final int waitingQueue;
         private final boolean useThreads;
@@ -158,11 +170,13 @@
         private final FaultToleranceMetrics.Histogram executionDuration;
         private final FaultToleranceMetrics.Histogram waitingDuration;
 
-        private Model(final Bulkhead bulkhead, final boolean useThreads,
+        private Model(final boolean disabled, final InvocationContext context,
+                      final Bulkhead bulkhead, final boolean useThreads,
                       final FaultToleranceMetrics.Counter callsAccepted,
                       final FaultToleranceMetrics.Counter callsRejected,
                       final FaultToleranceMetrics.Histogram executionDuration,
                       final FaultToleranceMetrics.Histogram waitingDuration) {
+            this.disabled = disabled;
             this.value = bulkhead.value();
             if (this.value <= 0) {
                 throw new FaultToleranceDefinitionException("Invalid value in @Bulkhead: " + value);
@@ -181,7 +195,22 @@
             this.useThreads = useThreads;
             if (this.useThreads) { // important: use a pool dedicated for that concern and not a reusable one
                 this.workQueue = new ArrayBlockingQueue<>(waitingQueue);
-                this.pool = new ThreadPoolExecutor(value, value, 0L, MILLISECONDS, workQueue) {
+                this.pool = new ThreadPoolExecutor(value, value, 0L, MILLISECONDS, workQueue, new ThreadFactory() {
+                    private final ThreadGroup group = ofNullable(System.getSecurityManager())
+                            .map(SecurityManager::getThreadGroup)
+                            .orElseGet(() -> Thread.currentThread().getThreadGroup());
+                    private final String prefix = "org.apache.geronimo.safeguard.bulkhead@" +
+                            System.identityHashCode(this) + "[" + context.getMethod() + "]-";
+                    private final AtomicLong counter = new AtomicLong();
+
+                    @Override
+                    public Thread newThread(final Runnable r) {
+                        return new Thread(group, r, prefix + counter.incrementAndGet());
+                    }
+                }, (r, executor) -> {
+                    callsRejected.inc();
+                    throw new BulkheadException("Can't accept task " + r);
+                }) {
                     @Override
                     public void execute(final Runnable command) {
                         final long submitted = System.nanoTime();
@@ -197,10 +226,6 @@
                         callsAccepted.inc();
                     }
                 };
-                this.pool.setRejectedExecutionHandler((r, executor) -> {
-                    callsRejected.inc();
-                    throw new BulkheadException("Can't accept task " + r);
-                });
                 this.semaphore = null;
             } else {
                 this.workQueue = null;
@@ -220,6 +245,9 @@
         @Inject
         private FaultToleranceMetrics metrics;
 
+        @Inject
+        private ConfigurationMapper configurationMapper;
+
         @PreDestroy
         private void destroy() {
             models.values().stream().filter(m -> m.pool != null).forEach(m -> m.pool.shutdownNow());
@@ -232,8 +260,7 @@
         public Model create(final InvocationContext context) {
             final boolean useThreads = finder.findAnnotation(Asynchronous.class, context) != null;
 
-            final String metricsNameBase = "ft." + context.getMethod().getDeclaringClass().getCanonicalName() + "."
-                    + context.getMethod().getName() + ".bulkhead.";
+            final String metricsNameBase = getMetricsNameBase(context);
             final FaultToleranceMetrics.Counter callsAccepted = metrics.counter(metricsNameBase + "callsAccepted.total",
                     "Number of calls accepted by the bulkhead");
             final FaultToleranceMetrics.Counter callsRejected = metrics.counter(metricsNameBase + "callsRejected.total",
@@ -248,18 +275,25 @@
                 waitingDuration = null;
             }
 
-            final Model model = new Model(
-                    finder.findAnnotation(Bulkhead.class, context), useThreads,
-                    callsAccepted, callsRejected, executionDuration, waitingDuration);
+            return new Model(
+                    !configurationMapper.isEnabled(context.getMethod(), Bulkhead.class), context,
+                    configurationMapper.map(finder.findAnnotation(Bulkhead.class, context), context.getMethod(), Bulkhead.class),
+                    useThreads, callsAccepted, callsRejected, executionDuration, waitingDuration);
+        }
 
+        private String getMetricsNameBase(InvocationContext context) {
+            return "ft." + context.getMethod().getDeclaringClass().getCanonicalName() + "."
+                    + context.getMethod().getName() + ".bulkhead.";
+        }
+
+        public void postCreate(final Model model, final InvocationContext context) {
+            final String metricsNameBase = getMetricsNameBase(context);
             metrics.gauge(metricsNameBase + "concurrentExecutions", "Number of currently running executions",
                     "none", model.concurrentCalls::get);
             if (model.workQueue != null) {
                 metrics.gauge(metricsNameBase + "waitingQueue.population",
                         "Number of executions currently waiting in the queue", "none", () -> (long) model.workQueue.size());
             }
-
-            return model;
         }
     }
 }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
index 132180e..42f1cf1 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/cdi/SafeguardExtension.java
@@ -1,16 +1,25 @@
 package org.apache.safeguard.impl.cdi;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Optional.ofNullable;
 import static java.util.stream.Collectors.toList;
 
 import java.lang.annotation.Annotation;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 import java.util.stream.Stream;
 
 import javax.annotation.Priority;
@@ -19,14 +28,19 @@
 import javax.enterprise.inject.Any;
 import javax.enterprise.inject.Default;
 import javax.enterprise.inject.spi.AfterBeanDiscovery;
+import javax.enterprise.inject.spi.AfterDeploymentValidation;
+import javax.enterprise.inject.spi.Annotated;
 import javax.enterprise.inject.spi.AnnotatedMethod;
 import javax.enterprise.inject.spi.AnnotatedType;
 import javax.enterprise.inject.spi.BeforeBeanDiscovery;
+import javax.enterprise.inject.spi.CDI;
+import javax.enterprise.inject.spi.DefinitionException;
 import javax.enterprise.inject.spi.Extension;
 import javax.enterprise.inject.spi.ProcessAnnotatedType;
 import javax.enterprise.inject.spi.ProcessBean;
 import javax.enterprise.inject.spi.WithAnnotations;
 import javax.interceptor.Interceptor;
+import javax.interceptor.InvocationContext;
 
 import org.apache.safeguard.impl.asynchronous.AsynchronousInterceptor;
 import org.apache.safeguard.impl.bulkhead.BulkheadInterceptor;
@@ -36,6 +50,7 @@
 import org.apache.safeguard.impl.fallback.FallbackInterceptor;
 import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
 import org.apache.safeguard.impl.retry.AfterRetryInterceptor;
+import org.apache.safeguard.impl.retry.BaseRetryInterceptor;
 import org.apache.safeguard.impl.retry.BeforeRetryInterceptor;
 import org.apache.safeguard.impl.timeout.TimeoutInterceptor;
 import org.eclipse.microprofile.faulttolerance.Asynchronous;
@@ -49,6 +64,14 @@
     private boolean foundExecutor;
     private GeronimoFaultToleranceConfig config;
     private Integer priorityBase;
+    private final Collection<Annotated> beansToValidate = new ArrayList<>();
+
+    // for validation and prepopulation of the cache at startup - note: should we enable to deactivate it?
+    private TimeoutInterceptor.Cache timeoutCache;
+    private BulkheadInterceptor.Cache bulkHeadCache;
+    private CircuitBreakerInterceptor.Cache circuitBreakerCache;
+    private FallbackInterceptor.Cache fallbackCache;
+    private BaseRetryInterceptor.Cache retryCache;
 
     void grabInterceptorPriority(@Observes final BeforeBeanDiscovery beforeBeanDiscovery) {
         config = GeronimoFaultToleranceConfig.create();
@@ -93,7 +116,7 @@
             Bulkhead.class, CircuitBreaker.class,
             Fallback.class, Retry.class, Timeout.class
     }) final ProcessAnnotatedType<?> processAnnotatedType) {
-        if (processAnnotatedType.getAnnotatedType().getJavaClass().getName().startsWith("org.apache.safeguard.impl")) {
+        if (processAnnotatedType.getAnnotatedType().getJavaClass().getName().startsWith("org.apache.safeguard.impl.")) {
             return;
         }
         if (faultToleranceAnnotations().anyMatch(it -> processAnnotatedType.getAnnotatedType().isAnnotationPresent(it))) {
@@ -109,10 +132,17 @@
     }
 
     void onBean(@Observes final ProcessBean<?> bean) {
-        if (bean.getBean().getQualifiers().stream().anyMatch(it -> it.annotationType() == Safeguard.class)
+        if (isSafeguardBean(bean)
             && bean.getBean().getTypes().stream().anyMatch(it -> Executor.class.isAssignableFrom(toClass(it)))) {
             foundExecutor = true;
         }
+
+        if (AnnotatedType.class.isInstance(bean.getAnnotated())) {
+            final AnnotatedType<?> at = AnnotatedType.class.cast(bean.getAnnotated());
+            if (at.getMethods().stream().anyMatch(m -> m.isAnnotationPresent(SafeguardEnabled.class))) {
+                beansToValidate.add(bean.getAnnotated());
+            }
+        }
     }
 
     void addMissingBeans(@Observes final AfterBeanDiscovery afterBeanDiscovery) {
@@ -131,7 +161,7 @@
                 .beanClass(FaultToleranceMetrics.class)
                 .qualifiers(Default.Literal.INSTANCE, Any.Literal.INSTANCE)
                 .scope(ApplicationScoped.class)
-                .createWith(c -> FaultToleranceMetrics.create());
+                .createWith(c -> FaultToleranceMetrics.create(config));
 
         if (!foundExecutor) {
             afterBeanDiscovery.addBean()
@@ -139,12 +169,115 @@
                     .types(Executor.class, Object.class)
                     .beanClass(Executor.class)
                     .qualifiers(Safeguard.Literal.INSTANCE, Any.Literal.INSTANCE)
-                    .createWith(c -> Executors.newCachedThreadPool())
+                    .createWith(c -> Executors.newCachedThreadPool(new ThreadFactory() {
+                        private final ThreadGroup group = ofNullable(System.getSecurityManager())
+                                .map(SecurityManager::getThreadGroup)
+                                .orElseGet(() -> Thread.currentThread().getThreadGroup());
+                        private final String prefix = "org.apache.geronimo.safeguard.asynchronous@" +
+                                System.identityHashCode(this);
+                        private final AtomicLong counter = new AtomicLong();
+
+                        @Override
+                        public Thread newThread(final Runnable r) {
+                            return new Thread(group, r, prefix + counter.incrementAndGet());
+                        }
+                    }))
                     .scope(ApplicationScoped.class)
                     .destroyWith((e, c) -> ExecutorService.class.cast(e).shutdownNow());
         }
     }
 
+    void addDefinitionErrors(@Observes final AfterDeploymentValidation validation) {
+        beansToValidate.stream()
+                       .map(this::validate)
+                       .filter(Objects::nonNull)
+                       .forEach(validation::addDeploymentProblem);
+        beansToValidate.clear();
+    }
+
+    private <T> T getInstance(final Class<T> cache) {
+        return CDI.current().select(cache).get();
+    }
+
+    private Throwable validate(final Annotated annotated) {
+        { // timeout
+            final Throwable throwable = validate(Timeout.class, annotated, context -> {
+                if (timeoutCache == null) {
+                    timeoutCache = getInstance(TimeoutInterceptor.Cache.class);
+                }
+                timeoutCache.create(context);
+            });
+            if (throwable != null) {
+                return throwable;
+            }
+        }
+        { // bulkhead
+            final Throwable throwable = validate(Bulkhead.class, annotated, context -> {
+                if (bulkHeadCache == null) {
+                    bulkHeadCache = getInstance(BulkheadInterceptor.Cache.class);
+                }
+                bulkHeadCache.create(context);
+            });
+            if (throwable != null) {
+                return throwable;
+            }
+        }
+        { // circuit breaker
+            final Throwable throwable = validate(CircuitBreaker.class, annotated, context -> {
+                if (circuitBreakerCache == null) {
+                    circuitBreakerCache = getInstance(CircuitBreakerInterceptor.Cache.class);
+                }
+                circuitBreakerCache.create(context);
+            });
+            if (throwable != null) {
+                return throwable;
+            }
+        }
+        { // fallback
+            final Throwable throwable = validate(Fallback.class, annotated, context -> {
+                if (fallbackCache == null) {
+                    fallbackCache = getInstance(FallbackInterceptor.Cache.class);
+                }
+                fallbackCache.create(context);
+            });
+            if (throwable != null) {
+                return throwable;
+            }
+        }
+        { // retry
+            final Throwable throwable = validate(Retry.class, annotated, context -> {
+                if (retryCache == null) {
+                    retryCache = getInstance(BaseRetryInterceptor.Cache.class);
+                }
+                retryCache.create(context);
+            });
+            if (throwable != null) {
+                return throwable;
+            }
+        }
+        return null;
+    }
+
+    private Throwable validate(final Class<? extends Annotation> marker,
+                               final Annotated type,
+                               final Consumer<InvocationContext> contextConsumer) {
+        final boolean classHasMarker = type.isAnnotationPresent(marker);
+        final AnnotatedType<?> annotatedType = AnnotatedType.class.cast(type);
+        try {
+            annotatedType.getMethods().stream()
+                         .filter(it -> classHasMarker || it.isAnnotationPresent(marker))
+                         .map(m -> new MockInvocationContext(m.getJavaMember()))
+                         .forEach(contextConsumer);
+            return null;
+        } catch (final RuntimeException re) {
+            return new DefinitionException(re);
+        }
+    }
+
+    private boolean isSafeguardBean(final ProcessBean<?> bean) {
+        return bean.getBean().getQualifiers().stream().anyMatch(it -> it.annotationType() == Safeguard.class);
+    }
+
     private void customizePriority(final ProcessAnnotatedType<?> type) {
         if (priorityBase == null) {
             return;
@@ -176,4 +309,54 @@
         return Stream.of(Asynchronous.class, Bulkhead.class, CircuitBreaker.class,
                 Fallback.class, Retry.class, Timeout.class);
     }
+
+    private static class MockInvocationContext implements InvocationContext {
+        private static final Object[] NO_PARAM = new Object[0];
+
+        private final Method method;
+
+        private MockInvocationContext(final Method m) {
+            this.method = m;
+        }
+
+        @Override
+        public Object getTarget() {
+            return null;
+        }
+
+        @Override
+        public Method getMethod() {
+            return method;
+        }
+
+        @Override
+        public Constructor<?> getConstructor() {
+            return null;
+        }
+
+        @Override
+        public Object[] getParameters() {
+            return NO_PARAM;
+        }
+
+        @Override
+        public void setParameters(final Object[] parameters) {
+            // no-op
+        }
+
+        @Override
+        public Map<String, Object> getContextData() {
+            return emptyMap();
+        }
+
+        @Override
+        public Object proceed() {
+            return null;
+        }
+
+        @Override
+        public Object getTimer() {
+            return null;
+        }
+    }
 }
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
index 218614e..798596e 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/circuitbreaker/CircuitBreakerInterceptor.java
@@ -60,6 +60,9 @@
                 circuitBreaker = existing;
             }
         }
+        if (circuitBreaker.disabled) {
+            return context.proceed();
+        }
 
         final CheckResult state = circuitBreaker.performStateCheck(CheckType.READ_ONLY);
         if (state == CheckResult.OPEN) {
@@ -187,8 +190,8 @@
             final Class<? extends Throwable>[] failOn = definition.failOn();
 
             final double failureRatio = definition.failureRatio();
-            if (failureRatio < 0) {
-                throw new FaultToleranceDefinitionException("CircuitBreaker failure ratio can't be < 0");
+            if (failureRatio < 0 || failureRatio > 1) {
+                throw new FaultToleranceDefinitionException("CircuitBreaker failure ratio can't be < 0 and > 1");
             }
 
             final int volumeThreshold = definition.requestVolumeThreshold();
@@ -197,14 +200,16 @@
             }
 
             final int successThreshold = definition.successThreshold();
-            if (successThreshold < 0) {
-                throw new FaultToleranceDefinitionException("CircuitBreaker success threshold can't be < 0");
+            if (successThreshold <= 0) {
+                throw new FaultToleranceDefinitionException("CircuitBreaker success threshold can't be <= 0");
             }
 
             final String metricsNameBase = "ft." + context.getMethod().getDeclaringClass().getCanonicalName() + "."
                     + context.getMethod().getName() + ".circuitbreaker.";
 
-            final CircuitBreakerImpl circuitBreaker = new CircuitBreakerImpl(volumeThreshold, delay, successThreshold,
+            final CircuitBreakerImpl circuitBreaker = new CircuitBreakerImpl(
+                    !mapper.isEnabled(context.getMethod(), CircuitBreaker.class),
+                    volumeThreshold, delay, successThreshold,
                     failOn, failureRatio, metrics.counter(metricsNameBase + "callsSucceeded.total",
                     "Number of calls allowed to run by the circuit breaker that returned successfully"),
                     metrics.counter(metricsNameBase + "callsFailed.total",
@@ -232,6 +237,7 @@
         private static final Boolean[] FIRST_SUCCESS_ARRAY = {Boolean.TRUE};
         private static final Boolean[] FIRST_FAILURE_ARRAY = {Boolean.FALSE};
 
+        private final boolean disabled;
         private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
         private final AtomicReference<CheckIntervalData> checkIntervalData;
         private final int volumeThreshold;
@@ -248,12 +254,14 @@
         private final AtomicLong closedDuration = new AtomicLong();
         private final FaultToleranceMetrics.Counter opened;
 
-        CircuitBreakerImpl(final int volumeThreshold, final long delay, final int successThreshold,
+        CircuitBreakerImpl(final boolean disabled,
+                           final int volumeThreshold, final long delay, final int successThreshold,
                            final Class<? extends Throwable>[] failOn, final double failureRatio,
                            final FaultToleranceMetrics.Counter callsSucceeded,
                            final FaultToleranceMetrics.Counter callsFailed,
                            final FaultToleranceMetrics.Counter callsPrevented,
                            final FaultToleranceMetrics.Counter opened) {
+            this.disabled = disabled;
             this.checkIntervalData = new AtomicReference<>(new CheckIntervalData(volumeThreshold, EMPTY_ARRAY, 0));
             this.volumeThreshold = volumeThreshold;
             this.delay = delay;
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/config/ConfigurationMapper.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/config/ConfigurationMapper.java
index 3789f1f..6e85b37 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/config/ConfigurationMapper.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/config/ConfigurationMapper.java
@@ -27,6 +27,7 @@
 import java.lang.reflect.Proxy;
 import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 import javax.enterprise.context.ApplicationScoped;
@@ -34,6 +35,8 @@
 import javax.enterprise.inject.spi.BeanManager;
 import javax.inject.Inject;
 
+import org.eclipse.microprofile.faulttolerance.Fallback;
+
 @ApplicationScoped
 public class ConfigurationMapper {
     @Inject
@@ -52,26 +55,50 @@
         }));
     }
 
+    public <T extends Annotation> boolean isEnabled(final Method method, final Class<T> api) {
+        final boolean methodLevel = isDefinedAtMethodLevel(method, api);
+        final Supplier<Boolean> globalEvaluator = () ->
+            ofNullable(findClassConfiguration(api, method, "enabled"))
+                    .map(Boolean::parseBoolean)
+                    .orElseGet(() -> ofNullable(config.read(String.format("%s/%s", api.getSimpleName(), "enabled")))
+                        .map(Boolean::parseBoolean)
+                        .orElseGet(() -> Fallback.class == api ?
+                            true :
+                            ofNullable(config.read("MP_Fault_Tolerance_NonFallback_Enabled"))
+                                    .map(Boolean::parseBoolean)
+                                    .orElse(true)));
+        if (methodLevel) {
+            return ofNullable(findMethodConfiguration(api, method, "enabled"))
+                    .map(Boolean::parseBoolean)
+                    .orElseGet(globalEvaluator);
+        }
+        return globalEvaluator.get();
+    }
+
     private <T extends Annotation> Object findConfiguredValue(final T instance, final Class<T> api,
                                                               final Method sourceMethod,
                                                               final Method proxyMethod, final Object[] args) {
-        final AnnotatedType<?> selected = beanManager.createAnnotatedType(sourceMethod.getDeclaringClass());
-        final boolean methodLevel = selected.getMethods().stream()
-                .filter(it -> it.getJavaMember().getName().equals(sourceMethod.getName()) &&
-                        Arrays.equals(it.getJavaMember().getParameterTypes(), sourceMethod.getParameterTypes()))
-                .anyMatch(it -> it.isAnnotationPresent(api));
-        if (methodLevel) {
-            return ofNullable(findDefaultConfiguration(proxyMethod))
+        final boolean methodLevel = isDefinedAtMethodLevel(sourceMethod, api);
+        final Supplier<Object> classEvaluator = () ->
+            ofNullable(findDefaultConfiguration(proxyMethod, proxyMethod.getName()))
                     .map(v -> coerce(v, proxyMethod.getReturnType()))
-                    .orElseGet(() -> ofNullable(findMethodConfiguration(api, sourceMethod, proxyMethod))
+                    .orElseGet(() -> ofNullable(findClassConfiguration(api, sourceMethod, proxyMethod.getName()))
                             .map(v -> coerce(v, proxyMethod.getReturnType()))
                             .orElseGet(() -> getReflectionConfig(instance, proxyMethod, args)));
+        if (methodLevel) {
+            return ofNullable(findMethodConfiguration(api, sourceMethod, proxyMethod.getName()))
+                        .map(v -> coerce(v, proxyMethod.getReturnType()))
+                        .orElseGet(classEvaluator::get);
         }
-        return ofNullable(findDefaultConfiguration(proxyMethod))
-                .map(v -> coerce(v, proxyMethod.getReturnType()))
-                .orElseGet(() -> ofNullable(findClassConfiguration(api, sourceMethod, proxyMethod))
-                    .map(v -> coerce(v, proxyMethod.getReturnType()))
-                    .orElseGet(() -> getReflectionConfig(instance, proxyMethod, args)));
+        return classEvaluator.get();
+    }
+
+    private <T extends Annotation> boolean isDefinedAtMethodLevel(final Method method, final Class<T> api) {
+        final AnnotatedType<?> selected = beanManager.createAnnotatedType(method.getDeclaringClass());
+        return selected.getMethods().stream()
+                       .filter(it -> it.getJavaMember().getName().equals(method.getName()) &&
+                               Arrays.equals(it.getJavaMember().getParameterTypes(), method.getParameterTypes()))
+                       .anyMatch(it -> it.isAnnotationPresent(api));
     }
 
     private <T extends Annotation> Object getReflectionConfig(final T instance,
@@ -86,18 +113,18 @@
         }
     }
 
-    private String findDefaultConfiguration(final Method api) {
-        return config.read(String.format("%s/%s", api.getDeclaringClass().getSimpleName(), api.getName()));
+    private String findDefaultConfiguration(final Method api, final String methodName) {
+        return config.read(String.format("%s/%s", api.getDeclaringClass().getSimpleName(), methodName));
     }
 
-    private <T extends Annotation> String findClassConfiguration(final Class<T> api, final Method beanMethod, final Method apiMethod) {
+    private <T extends Annotation> String findClassConfiguration(final Class<T> api, final Method beanMethod, final String apiMethod) {
         return config.read(String.format("%s/%s/%s",
-                beanMethod.getDeclaringClass().getName(), api.getSimpleName(), apiMethod.getName()));
+                beanMethod.getDeclaringClass().getName(), api.getSimpleName(), apiMethod));
     }
 
-    private <T extends Annotation> String findMethodConfiguration(final Class<T> api, final Method beanMethod, final Method apiMethod) {
+    private <T extends Annotation> String findMethodConfiguration(final Class<T> api, final Method beanMethod, final String apiMethod) {
         return config.read(String.format("%s/%s/%s/%s",
-                beanMethod.getDeclaringClass().getName(), beanMethod.getName(), api.getSimpleName(), apiMethod.getName()));
+                beanMethod.getDeclaringClass().getName(), beanMethod.getName(), api.getSimpleName(), apiMethod));
     }
 
     private Object coerce(final String raw, final Class<?> expected) {
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
index c4b12e6..5aabb8d 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/fallback/FallbackInterceptor.java
@@ -18,6 +18,8 @@
  */
 package org.apache.safeguard.impl.fallback;
 
+import static java.util.Optional.ofNullable;
+
 import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -52,7 +54,7 @@
 
 // @Fallback - added through the extension since the @Target doesnt allow it
 @Interceptor
-@Priority(Interceptor.Priority.PLATFORM_AFTER)
+@Priority(Interceptor.Priority.PLATFORM_AFTER + 2)
 public class FallbackInterceptor implements Serializable {
     @Inject
     private Cache cache;
@@ -123,6 +125,19 @@
         }
 
         public FallbackHandler<?> create(final InvocationContext context) {
+            if (!mapper.isEnabled(context.getMethod(), Fallback.class)) {
+                return (FallbackHandler<Object>) context13 -> {
+                    final Throwable failure = context13.getFailure();
+                    if (RuntimeException.class.isInstance(failure)) {
+                        throw RuntimeException.class.cast(failure);
+                    }
+                    if (Error.class.isInstance(failure)) {
+                        throw Error.class.cast(failure);
+                    }
+                    throw new IllegalStateException(failure);
+                };
+            }
+
             final Fallback fallback = mapper.map(finder.findAnnotation(Fallback.class, context), context.getMethod(), Fallback.class);
             final Class<? extends FallbackHandler<?>> value = fallback.value();
             final String method = fallback.fallbackMethod();
@@ -151,9 +166,10 @@
                 handler = fallbackHandler;
             } else {
                 try {
-                    final Method fallbackMethod = context.getTarget()
-                                                         .getClass()
-                                                         .getMethod(method, context.getMethod().getParameterTypes());
+                    final Method fallbackMethod = ofNullable(context.getTarget())
+                            .map(Object::getClass)
+                            .orElseGet(() -> Class.class.cast(context.getMethod().getDeclaringClass()))
+                            .getMethod(method, context.getMethod().getParameterTypes());
                     if (!extension.toClass(context.getMethod()
                                                   .getReturnType())
                                   .isAssignableFrom(extension.toClass(fallbackMethod.getReturnType())) || !Arrays.equals(
@@ -166,13 +182,19 @@
                     }
                     handler = (FallbackHandler<Object>) context1 -> {
                         try {
-                            return fallbackMethod.invoke(EnrichedExecutionContext.class.cast(context1)
-                                                                                       .getTarget(),
-                                    context1.getParameters());
+                            return fallbackMethod.invoke(
+                                    EnrichedExecutionContext.class.cast(context1).getTarget(), context1.getParameters());
                         } catch (final IllegalAccessException e) {
                             throw new IllegalStateException(e);
                         } catch (final InvocationTargetException e) {
-                            throw new IllegalStateException(e.getTargetException());
+                            final Throwable targetException = e.getTargetException();
+                            if (RuntimeException.class.isInstance(targetException)) {
+                                throw RuntimeException.class.cast(targetException);
+                            }
+                            if (Error.class.isInstance(targetException)) {
+                                throw Error.class.cast(targetException);
+                            }
+                            throw new IllegalStateException(targetException);
                         }
                     };
                 } catch (final NoSuchMethodException e) {
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/interceptor/IdGeneratorInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/interceptor/IdGeneratorInterceptor.java
index 58dd561..0e3eacf 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/interceptor/IdGeneratorInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/interceptor/IdGeneratorInterceptor.java
@@ -68,12 +68,6 @@
         } catch (final Exception | Error e) {
             methodCounters.failed.inc();
             throw e;
-        } finally {
-            if (old != null) {
-                context.getContextData().put(IdGeneratorInterceptor.class.getName(), old);
-            } else {
-                context.getContextData().remove(IdGeneratorInterceptor.class.getName());
-            }
         }
     }
 
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/metrics/FaultToleranceMetrics.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/metrics/FaultToleranceMetrics.java
index 7edb886..76f8fbd 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/metrics/FaultToleranceMetrics.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/metrics/FaultToleranceMetrics.java
@@ -29,6 +29,8 @@
 import javax.annotation.Priority;
 import javax.enterprise.inject.spi.CDI;
 
+import org.apache.safeguard.impl.config.GeronimoFaultToleranceConfig;
+
 public interface FaultToleranceMetrics {
     Counter counter(String name, String description);
     void gauge(String name, String description, String unit, Supplier<Long> supplier);
@@ -45,7 +47,10 @@
     interface Gauge extends Supplier<Long> {
     }
 
-    static FaultToleranceMetrics create() {
+    static FaultToleranceMetrics create(final GeronimoFaultToleranceConfig config) {
+        if ("false".equalsIgnoreCase(config.read("MP_Fault_Tolerance_Metrics_Enabled"))) {
+            return new NoMetrics();
+        }
         try {
             final Optional<FaultToleranceMetrics> iterator = StreamSupport.stream(
                     ServiceLoader.load(FaultToleranceMetrics.class).spliterator(), false)
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
index aa8aa49..1e6120f 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/retry/BaseRetryInterceptor.java
@@ -25,6 +25,7 @@
 import java.lang.reflect.Method;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
@@ -35,6 +36,7 @@
 import javax.interceptor.InvocationContext;
 
 import org.apache.safeguard.impl.annotation.AnnotationFinder;
+import org.apache.safeguard.impl.asynchronous.BaseAsynchronousInterceptor;
 import org.apache.safeguard.impl.config.ConfigurationMapper;
 import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
 import org.apache.safeguard.impl.metrics.FaultToleranceMetrics;
@@ -55,17 +57,20 @@
             model = cache.create(context);
             models.putIfAbsent(context.getMethod(), model);
         }
+        if (model.disabled) {
+            return context.proceed();
+        }
 
         final Map<String, Object> contextData = context.getContextData();
-        final String contextKey = BaseRetryInterceptor.class.getName() + ".context_"
-                + contextData.get(IdGeneratorInterceptor.class.getName());
+        final Object id = contextData.get(IdGeneratorInterceptor.class.getName());
+        final String contextKey = BaseRetryInterceptor.class.getName() + ".context_" + id;
         Context retryContext = Context.class.cast(contextData.get(contextKey));
         if (retryContext == null) {
             retryContext = new Context(System.nanoTime() + model.maxDuration, model.maxRetries);
             contextData.put(contextKey, retryContext);
         }
 
-        while (retryContext.counter >= 0) {
+        while (retryContext.counter >= 0) { // todo: handle async if result is a Future or CompletionStage (weird no?)
             try {
                 final Object proceed = context.proceed();
                 if (retryContext.counter == model.maxRetries) {
@@ -73,27 +78,42 @@
                 } else {
                     executeFinalCounterAction(contextData, model.callsSucceededRetried);
                 }
+                if (BaseAsynchronousInterceptor.BaseFuture.class.isInstance(proceed)) {
+                    final Model modelRef = model;
+                    contextData.put(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" + id,
+                        (BaseAsynchronousInterceptor.ErrorHandler<Exception, Future<?>>) error -> {
+                            handleException(contextData, contextKey, modelRef, error);
+                            return Future.class.cast(context.proceed());
+                        });
+                }
                 return proceed;
-            } catch (final CircuitBreakerOpenException cboe) {
-                throw cboe;
             } catch (final Exception re) {
-                // refresh the counter from the other interceptors
-                retryContext = Context.class.cast(contextData.get(contextKey));
-
-                if (model.abortOn(re) || (--retryContext.counter) < 0 || System.nanoTime() >= retryContext.maxEnd) {
-                    executeFinalCounterAction(contextData, model.callsFailed);
-                    throw re;
-                }
-                if (!model.retryOn(re)) {
-                    throw re;
-                }
-                model.retries.inc();
-                Thread.sleep(model.nextPause());
+                handleException(contextData, contextKey, model, re);
             }
         }
         throw new FaultToleranceException("Inaccessible normally, here for compilation");
     }
 
+    private void handleException(final Map<String, Object> contextData, final String contextKey,
+                                 final Model modelRef, final Exception error) throws Exception {
+        if (CircuitBreakerOpenException.class.isInstance(error)) {
+            throw error;
+        }
+
+        // refresh the counter from the other interceptors
+        final Context ctx = Context.class.cast(contextData.get(contextKey));
+
+        if (modelRef.abortOn(error) || (--ctx.counter) < 0 || System.nanoTime() >= ctx.maxEnd) {
+            executeFinalCounterAction(contextData, modelRef.callsFailed);
+            throw error;
+        }
+        if (!modelRef.retryOn(error)) {
+            throw error;
+        }
+        modelRef.retries.inc();
+        Thread.sleep(modelRef.nextPause());
+    }
+
     protected abstract void executeFinalCounterAction(Map<String, Object> contextData, FaultToleranceMetrics.Counter counter);
 
     static class Model {
@@ -118,9 +138,13 @@
 
         private final FaultToleranceMetrics.Counter retries;
 
-        private Model(final Retry retry, final FaultToleranceMetrics.Counter callsSucceededNotRetried,
-                final FaultToleranceMetrics.Counter callsSucceededRetried, final FaultToleranceMetrics.Counter callsFailed,
-                final FaultToleranceMetrics.Counter retries) {
+        private final boolean disabled;
+
+        private Model(final boolean disabled,
+                      final Retry retry, final FaultToleranceMetrics.Counter callsSucceededNotRetried,
+                      final FaultToleranceMetrics.Counter callsSucceededRetried, final FaultToleranceMetrics.Counter callsFailed,
+                      final FaultToleranceMetrics.Counter retries) {
+            this.disabled = disabled;
             this.abortOn = retry.abortOn();
             this.retryOn = retry.retryOn();
             this.maxDuration = retry.delayUnit().getDuration().toNanos() * retry.maxDuration();
@@ -131,6 +155,19 @@
             this.callsSucceededRetried = callsSucceededRetried;
             this.callsFailed = callsFailed;
             this.retries = retries;
+
+            if (maxRetries < 0) {
+                throw new FaultToleranceException("max retries can't be negative");
+            }
+            if (delay < 0) {
+                throw new FaultToleranceException("delay can't be negative");
+            }
+            if (maxDuration < 0) {
+                throw new FaultToleranceException("max duration can't be negative");
+            }
+            if (jitter < 0) {
+                throw new FaultToleranceException("jitter can't be negative");
+            }
         }
 
         private boolean abortOn(final Exception re) {
@@ -175,7 +212,9 @@
             final Retry configuredRetry = configurationMapper.map(retry, context.getMethod(), Retry.class);
             final String metricsNameBase = "ft." + context.getMethod().getDeclaringClass().getCanonicalName() + "."
                     + context.getMethod().getName() + ".retry.";
-            return new Model(configuredRetry,
+            return new Model(
+                    !configurationMapper.isEnabled(context.getMethod(), Retry.class),
+                    configuredRetry,
                     metrics.counter(metricsNameBase + "callsSucceededNotRetried.total",
                             "The number of times the method was called and succeeded without retrying"),
                     metrics.counter(metricsNameBase + "callsSucceededRetried.total",
diff --git a/safeguard-impl/src/main/java/org/apache/safeguard/impl/timeout/TimeoutInterceptor.java b/safeguard-impl/src/main/java/org/apache/safeguard/impl/timeout/TimeoutInterceptor.java
index a7b0c1f..6cec860 100644
--- a/safeguard-impl/src/main/java/org/apache/safeguard/impl/timeout/TimeoutInterceptor.java
+++ b/safeguard-impl/src/main/java/org/apache/safeguard/impl/timeout/TimeoutInterceptor.java
@@ -62,6 +62,10 @@
             model = cache.create(context);
             timeouts.putIfAbsent(context.getMethod(), model);
         }
+        if (model.disabled) {
+            return context.proceed();
+        }
+
         final FutureTask<Object> task = new FutureTask<>(context::proceed);
         final long start = System.nanoTime();
         executor.execute(task);
@@ -100,13 +104,15 @@
     }
 
     private static class Model {
+        private final boolean disabled;
         private final long timeout;
         private final FaultToleranceMetrics.Histogram executionDuration;
         private final FaultToleranceMetrics.Counter timeouts;
         private final FaultToleranceMetrics.Counter successes;
 
-        private Model(final long timeout, final FaultToleranceMetrics.Histogram executionDuration,
+        private Model(final boolean disabled, final long timeout, final FaultToleranceMetrics.Histogram executionDuration,
                       final FaultToleranceMetrics.Counter timeouts, final FaultToleranceMetrics.Counter successes) {
+            this.disabled = disabled;
             this.timeout = timeout;
             this.executionDuration = executionDuration;
             this.timeouts = timeouts;
@@ -139,6 +145,7 @@
             final String metricsNameBase = "ft." + context.getMethod().getDeclaringClass().getCanonicalName() + "." +
                     context.getMethod().getName() + ".timeout.";
             return new Model(
+                    !mapper.isEnabled(context.getMethod(), Timeout.class),
                     timeout.unit().getDuration().toNanos() * timeout.value(),
                     metrics.histogram(metricsNameBase + "executionDuration", "Histogram of execution times for the method"),
                     metrics.counter(metricsNameBase + "callsTimedOut.total", "The number of times the method timed out"),
diff --git a/safeguard-impl/src/main/resources/META-INF/beans.xml b/safeguard-impl/src/main/resources/META-INF/beans.xml
index a130381..1379576 100644
--- a/safeguard-impl/src/main/resources/META-INF/beans.xml
+++ b/safeguard-impl/src/main/resources/META-INF/beans.xml
@@ -16,4 +16,11 @@
   ~  specific language governing permissions and limitations
   ~  under the License.
   -->
-<beans version="1.1" bean-discovery-mode="annotated"/>
+<beans xmlns="http://xmlns.jcp.org/xml/ns/javaee"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+        http://xmlns.jcp.org/xml/ns/javaee
+        http://xmlns.jcp.org/xml/ns/javaee/beans_2_0.xsd"
+       bean-discovery-mode="all" version="2.0">
+  <trim/>
+</beans>
diff --git a/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/ArchiveAppender.java b/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/ArchiveAppender.java
deleted file mode 100644
index 18193d1..0000000
--- a/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/ArchiveAppender.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing,
- *  software distributed under the License is distributed on an
- *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  KIND, either express or implied.  See the License for the
- *  specific language governing permissions and limitations
- *  under the License.
- */
-
-package org.apache.safeguard.ft.tck;
-
-import org.jboss.arquillian.container.test.spi.client.deployment.ApplicationArchiveProcessor;
-import org.jboss.arquillian.test.spi.TestClass;
-import org.jboss.shrinkwrap.api.Archive;
-import org.jboss.shrinkwrap.api.ShrinkWrap;
-import org.jboss.shrinkwrap.api.asset.EmptyAsset;
-import org.jboss.shrinkwrap.api.spec.JavaArchive;
-import org.jboss.shrinkwrap.api.spec.WebArchive;
-
-public class ArchiveAppender implements ApplicationArchiveProcessor {
-
-    @Override
-    public void process(final Archive<?> archive, final TestClass testClass) {
-        archive.as(WebArchive.class)
-                .addAsLibrary(ShrinkWrap.create(JavaArchive.class, "safeguard-impl.jar")
-                        .addPackages(true, "org.apache.safeguard")
-                        .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml"));
-    }
-}
diff --git a/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/SafeguardTCKExtension.java b/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/SafeguardTCKExtension.java
index c665c52..7632f61 100644
--- a/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/SafeguardTCKExtension.java
+++ b/safeguard-impl/src/test/java/org/apache/safeguard/ft/tck/SafeguardTCKExtension.java
@@ -19,6 +19,10 @@
 
 package org.apache.safeguard.ft.tck;
 
+import static java.util.stream.Collectors.toList;
+
+import java.io.File;
+import java.util.List;
 import java.util.stream.Stream;
 
 import javax.enterprise.inject.spi.CDI;
@@ -26,27 +30,54 @@
 import org.apache.safeguard.impl.circuitbreaker.CircuitBreakerInterceptor;
 import org.apache.webbeans.spi.ContainerLifecycle;
 import org.eclipse.microprofile.metrics.MetricRegistry;
+import org.jboss.arquillian.container.spi.client.deployment.DeploymentDescription;
+import org.jboss.arquillian.container.test.impl.client.deployment.AnnotationDeploymentScenarioGenerator;
 import org.jboss.arquillian.container.test.spi.client.deployment.ApplicationArchiveProcessor;
+import org.jboss.arquillian.container.test.spi.client.deployment.DeploymentScenarioGenerator;
 import org.jboss.arquillian.core.api.Instance;
 import org.jboss.arquillian.core.api.annotation.Inject;
 import org.jboss.arquillian.core.api.annotation.Observes;
 import org.jboss.arquillian.core.spi.LoadableExtension;
+import org.jboss.arquillian.test.spi.TestClass;
 import org.jboss.arquillian.test.spi.event.suite.After;
+import org.jboss.shrinkwrap.api.Archive;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.asset.FileAsset;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.jboss.shrinkwrap.api.spec.WebArchive;
 
 public class SafeguardTCKExtension implements LoadableExtension {
 
     @Override
     public void register(final ExtensionBuilder extensionBuilder) {
-        extensionBuilder.service(ApplicationArchiveProcessor.class, ArchiveAppender.class)
+        extensionBuilder
+                .service(ApplicationArchiveProcessor.class, ArchiveAppender.class)
+                .override(DeploymentScenarioGenerator.class, AnnotationDeploymentScenarioGenerator.class, EnrichableGenerator.class)
                 .observer(LeakingTCKWorkaround.class);
     }
 
+    public static class ArchiveAppender implements ApplicationArchiveProcessor {
+
+        private Archive<?> createAuxiliaryArchive() {
+            return ShrinkWrap.create(JavaArchive.class, "safeguard-impl.jar")
+                             .addPackages(true, "org.apache.safeguard")
+                             .addAsManifestResource(new FileAsset(new File("src/main/resources/META-INF/beans.xml")), "beans.xml");
+        }
+
+        @Override
+        public void process(final Archive<?> auxiliaryArchive, final TestClass testClass) {
+            if (auxiliaryArchive.getName().endsWith(".war")) {
+                auxiliaryArchive.as(WebArchive.class).addAsLibrary(createAuxiliaryArchive());
+            }
+        }
+    }
+
     public static class LeakingTCKWorkaround {
 
         @Inject
         private Instance<ContainerLifecycle> lifecycle;
 
-        public void clearCache(@Observes(precedence = -1) final After event) {
+        public void clearCircuitBreakerCache(@Observes(precedence = -1) final After event) {
             final CDI<Object> cdi = CDI.current();
             final MetricRegistry registry = cdi.select(MetricRegistry.class).get();
             cdi.select(CircuitBreakerInterceptor.Cache.class).get().getCircuitBreakers().clear();
@@ -55,4 +86,11 @@
                .forEach(registry::remove);
         }
     }
+
+    public static class EnrichableGenerator extends AnnotationDeploymentScenarioGenerator {
+        @Override
+        public List<DeploymentDescription> generate(final TestClass testClass) {
+            return super.generate(testClass).stream().peek(it -> it.shouldBeTestable(true)).collect(toList());
+        }
+    }
 }
diff --git a/safeguard-impl/src/test/resources/dev.xml b/safeguard-impl/src/test/resources/dev.xml
index 479c6d4..00692cc 100644
--- a/safeguard-impl/src/test/resources/dev.xml
+++ b/safeguard-impl/src/test/resources/dev.xml
@@ -19,13 +19,17 @@
 <suite name="Dev TCK Run" verbose="2" configfailurepolicy="continue" >
   <test name="TCK Run">
     <!-- all TCK -->
+    <!--
     <packages>
       <package name="org.eclipse.microprofile.fault.tolerance.tck.*" />
     </packages>
+    -->
     <classes>
-      <!--
-      <class name="org.eclipse.microprofile.fault.tolerance.tck.CircuitBreakerRetryTest" />
-      -->
+      <class name="org.eclipse.microprofile.fault.tolerance.tck.ConfigTest" />
+      <class name="org.eclipse.microprofile.fault.tolerance.tck.invalidParameters.InvalidRetryDelayDurationTest" />
+      <class name="org.eclipse.microprofile.fault.tolerance.tck.illegalConfig.IncompatibleFallbackTest" />
+      <class name="org.eclipse.microprofile.fault.tolerance.tck.metrics.CircuitBreakerMetricTest" />
+      <class name="org.eclipse.microprofile.fault.tolerance.tck.visibility.retry.RetryVisibilityTest" />
     </classes>
   </test>
 </suite>