CAMEL-20684: Improve MicroProfile Fault Tolerance timer setup
diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
index 7f6e600..765e9fa 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceProcessor.java
@@ -29,7 +29,7 @@
import io.smallrye.faulttolerance.core.fallback.Fallback;
import io.smallrye.faulttolerance.core.stopwatch.SystemStopwatch;
import io.smallrye.faulttolerance.core.timeout.Timeout;
-import io.smallrye.faulttolerance.core.timer.ThreadTimer;
+import io.smallrye.faulttolerance.core.timer.Timer;
import io.smallrye.faulttolerance.core.util.ExceptionDecision;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
@@ -83,12 +83,10 @@
private boolean shutdownScheduledExecutorService;
private ExecutorService executorService;
private boolean shutdownExecutorService;
- private ExecutorService threadTimerExecutorService;
- private boolean shutdownThreadTimerExecutorService;
private ProcessorExchangeFactory processorExchangeFactory;
private PooledExchangeTaskFactory taskFactory;
private PooledExchangeTaskFactory fallbackTaskFactory;
- private ThreadTimer timer;
+ private Timer timer;
public FaultToleranceProcessor(FaultToleranceConfiguration config, Processor processor,
Processor fallbackProcessor) {
@@ -151,6 +149,14 @@
this.executorService = executorService;
}
+ public Timer getTimer() {
+ return timer;
+ }
+
+ public void setTimer(Timer timer) {
+ this.timer = timer;
+ }
+
@Override
public String getTraceLabel() {
return "faultTolerance";
@@ -262,7 +268,7 @@
}
// 2. timeout
if (config.isTimeoutEnabled()) {
- target = new Timeout<>(target, "timeout", config.getTimeoutDuration(), timer);
+ target = new Timeout<>(target, "timeout", config.getTimeoutDuration(), getTimer());
}
// 3. fallback
if (fallbackProcessor != null) {
@@ -349,15 +355,10 @@
protected void doInit() throws Exception {
ObjectHelper.notNull(camelContext, "CamelContext", this);
if (circuitBreaker == null) {
- threadTimerExecutorService
- = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "CircuitBreakerThreadTimer");
- shutdownThreadTimerExecutorService = true;
-
- timer = ThreadTimer.create(threadTimerExecutorService);
circuitBreaker = new CircuitBreaker<>(
invocation(), id, ExceptionDecision.ALWAYS_FAILURE, config.getDelay(), config.getRequestVolumeThreshold(),
config.getFailureRatio(),
- config.getSuccessThreshold(), SystemStopwatch.INSTANCE, timer);
+ config.getSuccessThreshold(), SystemStopwatch.INSTANCE, getTimer());
}
ServiceHelper.initService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
@@ -389,14 +390,6 @@
getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
executorService = null;
}
- if (timer != null) {
- timer.shutdown();
- timer = null;
- }
- if (shutdownThreadTimerExecutorService && threadTimerExecutorService != null) {
- getCamelContext().getExecutorServiceManager().shutdownNow(threadTimerExecutorService);
- threadTimerExecutorService = null;
- }
ServiceHelper.stopService(processorExchangeFactory, taskFactory, fallbackTaskFactory, processor);
}
diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
index f966ad2..d5b19e9 100644
--- a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceReifier.java
@@ -21,7 +21,9 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;
+import io.smallrye.faulttolerance.ExecutorHolder;
import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreaker;
+import io.smallrye.faulttolerance.core.timer.Timer;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.model.CircuitBreakerDefinition;
@@ -69,6 +71,7 @@
answer.setCircuitBreaker(cb);
}
configureBulkheadExecutorService(answer, config);
+ configureTimer(answer);
return answer;
}
@@ -126,6 +129,25 @@
}
}
+ private void configureTimer(FaultToleranceProcessor answer) throws Exception {
+ Timer timer;
+
+ // If running in a CDI container, try to find the singleton scoped ExecutorHolder. Else we have to manage the Timer ourselves
+ ExecutorHolder executorHolder = findSingleByType(ExecutorHolder.class);
+ if (executorHolder != null) {
+ timer = executorHolder.getTimer();
+ } else {
+ FaultToleranceTimerService threadTimerService = camelContext.hasService(FaultToleranceTimerService.class);
+ if (threadTimerService == null) {
+ threadTimerService = new FaultToleranceTimerService();
+ camelContext.addService(threadTimerService);
+ }
+ timer = threadTimerService.getTimer();
+ }
+
+ answer.setTimer(timer);
+ }
+
// *******************************
// Helpers
// *******************************
diff --git a/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java
new file mode 100644
index 0000000..973aa1c
--- /dev/null
+++ b/components/camel-microprofile/camel-microprofile-fault-tolerance/src/main/java/org/apache/camel/component/microprofile/faulttolerance/FaultToleranceTimerService.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.microprofile.faulttolerance;
+
+import java.util.concurrent.ExecutorService;
+
+import io.smallrye.faulttolerance.core.timer.ThreadTimer;
+import io.smallrye.faulttolerance.core.timer.Timer;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * Service to manage the lifecycle of the SmallRye Fault Tolerance Timer. Primarily used when running without CDI
+ * container support.
+ */
+public class FaultToleranceTimerService extends ServiceSupport implements CamelContextAware {
+ private ExecutorService threadTimerExecutorService;
+ private Timer timer;
+ private CamelContext camelContext;
+
+ @Override
+ protected void doInit() throws Exception {
+ threadTimerExecutorService
+ = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "CircuitBreakerThreadTimer");
+ timer = ThreadTimer.create(threadTimerExecutorService);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (timer != null) {
+ try {
+ timer.shutdown();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ timer = null;
+ }
+ }
+
+ if (threadTimerExecutorService != null) {
+ getCamelContext().getExecutorServiceManager().shutdownNow(threadTimerExecutorService);
+ threadTimerExecutorService = null;
+ }
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return this.camelContext;
+ }
+
+ public Timer getTimer() {
+ return timer;
+ }
+}