CAMEL-20778: intercept EIP should not intercept doTry/doCatch/doFinally.
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java
new file mode 100644
index 0000000..a346327
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptableProcessor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Processor;
+
+/**
+ * To control whether a {@link Processor} can be intercepted via {@link InterceptStrategy}.
+ *
+ * Some EIPs such as try/catch/finally cannot be intercepted.
+ */
+public interface InterceptableProcessor {
+
+ /**
+ * Whether the processor can be intercepted or not.
+ *
+ * @return true to allow intercepting, false to skip.
+ */
+ boolean canIntercept();
+
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
index d6be291..7d7a981 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultChannel.java
@@ -36,6 +36,7 @@
import org.apache.camel.spi.Debugger;
import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.InterceptableProcessor;
import org.apache.camel.spi.ManagementInterceptStrategy;
import org.apache.camel.spi.MessageHistoryFactory;
import org.apache.camel.spi.Tracer;
@@ -233,24 +234,28 @@
Collections.reverse(interceptors);
// wrap the output with the configured interceptors
Processor target = nextProcessor;
- for (InterceptStrategy strategy : interceptors) {
- Processor next = target == nextProcessor ? null : nextProcessor;
- // use the fine grained definition (eg the child if available). Its always possible to get back to the parent
- Processor wrapped = strategy.wrapProcessorInInterceptors(route.getCamelContext(), targetOutputDef, target, next);
- if (!(wrapped instanceof AsyncProcessor)) {
- LOG.warn("Interceptor: {} at: {} does not return an AsyncProcessor instance."
- + " This causes the asynchronous routing engine to not work as optimal as possible."
- + " See more details at the InterceptStrategy javadoc."
- + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
- + " but its not the most optimal solution. Please consider changing your interceptor to comply.",
- strategy, definition);
+ boolean skip = target instanceof InterceptableProcessor && !((InterceptableProcessor) target).canIntercept();
+ if (!skip) {
+ for (InterceptStrategy strategy : interceptors) {
+ Processor next = target == nextProcessor ? null : nextProcessor;
+ // use the fine grained definition (eg the child if available). Its always possible to get back to the parent
+ Processor wrapped
+ = strategy.wrapProcessorInInterceptors(route.getCamelContext(), targetOutputDef, target, next);
+ if (!(wrapped instanceof AsyncProcessor)) {
+ LOG.warn("Interceptor: {} at: {} does not return an AsyncProcessor instance."
+ + " This causes the asynchronous routing engine to not work as optimal as possible."
+ + " See more details at the InterceptStrategy javadoc."
+ + " Camel will use a bridge to adapt the interceptor to the asynchronous routing engine,"
+ + " but its not the most optimal solution. Please consider changing your interceptor to comply.",
+ strategy, definition);
+ }
+ if (!(wrapped instanceof WrapAwareProcessor)) {
+ // wrap the target so it becomes a service and we can manage its lifecycle
+ wrapped = PluginHelper.getInternalProcessorFactory(camelContext)
+ .createWrapProcessor(wrapped, target);
+ }
+ target = wrapped;
}
- if (!(wrapped instanceof WrapAwareProcessor)) {
- // wrap the target so it becomes a service and we can manage its lifecycle
- wrapped = PluginHelper.getInternalProcessorFactory(camelContext)
- .createWrapProcessor(wrapped, target);
- }
- target = wrapped;
}
if (route.isStreamCaching()) {
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
index a5e7c73e..6263365 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/CatchProcessor.java
@@ -31,6 +31,7 @@
import org.apache.camel.RollbackExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
@@ -42,7 +43,7 @@
/**
* A processor which catches exceptions.
*/
-public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware {
+public class CatchProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware, InterceptableProcessor {
private static final Logger LOG = LoggerFactory.getLogger(CatchProcessor.class);
@@ -111,6 +112,11 @@
}
@Override
+ public boolean canIntercept() {
+ return false;
+ }
+
+ @Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
final Exception e = exchange.getException();
Throwable caught = catches(exchange, e);
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
index d125d0d..97e9f4c 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/FinallyProcessor.java
@@ -22,6 +22,7 @@
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.processor.DelegateAsyncProcessor;
@@ -31,7 +32,8 @@
/**
* Processor to handle do finally supporting asynchronous routing engine
*/
-public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable, IdAware, RouteIdAware {
+public class FinallyProcessor extends DelegateAsyncProcessor
+ implements Traceable, IdAware, RouteIdAware, InterceptableProcessor {
private static final Logger LOG = LoggerFactory.getLogger(FinallyProcessor.class);
@@ -96,6 +98,11 @@
this.routeId = routeId;
}
+ @Override
+ public boolean canIntercept() {
+ return false;
+ }
+
private static final class FinallyAsyncCallback implements AsyncCallback {
private final Exchange exchange;
diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
index 8dd9bd9..64b0f9b 100644
--- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
+++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/TryProcessor.java
@@ -29,6 +29,7 @@
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.InterceptableProcessor;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -41,7 +42,8 @@
/**
* Implements try/catch/finally type processing
*/
-public class TryProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware {
+public class TryProcessor extends AsyncProcessorSupport
+ implements Navigate<Processor>, Traceable, IdAware, RouteIdAware, InterceptableProcessor {
private static final Logger LOG = LoggerFactory.getLogger(TryProcessor.class);
@@ -73,6 +75,11 @@
}
@Override
+ public boolean canIntercept() {
+ return false;
+ }
+
+ @Override
public boolean process(Exchange exchange, AsyncCallback callback) {
reactiveExecutor.schedule(new TryState(exchange, callback));
return false;
diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
index 264551b..363fc66 100644
--- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
+++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/InterceptReifier.java
@@ -45,6 +45,7 @@
public Processor wrapProcessorInInterceptors(
CamelContext context, NamedNode definition, Processor target, Processor nextTarget)
throws Exception {
+
// store the target we are intercepting
this.interceptedTarget = target;
diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java
new file mode 100644
index 0000000..f02e9b9
--- /dev/null
+++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptDoTryCatchTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.intercept;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class InterceptDoTryCatchTest extends ContextTestSupport {
+
+ @Test
+ public void testIntercept() throws Exception {
+ getMockEndpoint("mock:foo").expectedMessageCount(1);
+ getMockEndpoint("mock:bar").expectedMessageCount(1);
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ getMockEndpoint("mock:intercepted").expectedMessageCount(4);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ intercept().to("mock:intercepted");
+
+ from("direct:start")
+ .to("mock:foo")
+ .doTry()
+ .throwException(new IllegalArgumentException("Forced"))
+ .doCatch(Exception.class)
+ .to("mock:bar")
+ .end()
+ .to("mock:result");
+ }
+ };
+ }
+}