| [[transactionalClient-eip]] |
| = Transactional Client |
| |
| Camel recommends supporting the |
| http://www.enterpriseintegrationpatterns.com/TransactionalClient.html[Transactional |
| Client] from the xref:enterprise-integration-patterns.adoc[EIP patterns] |
| using spring transactions. |
| |
| image::eip/TransactionalClientSolution.gif[image] |
| |
| Transaction Oriented Endpoints like xref:components::jms-component.adoc[JMS] support using a |
| transaction for both inbound and outbound message exchanges. Endpoints |
| that support transactions will participate in the current transaction |
| context that they are called from. |
| |
| == Configuration of Redelivery |
| |
| The redelivery in transacted mode is *not* handled by Camel but by the |
| backing system (the transaction manager). In such cases you should |
| resort to the backing system how to configure the redelivery. |
| |
| You should use the |
| http://camel.apache.org/maven/current/camel-spring/apidocs/org/apache/camel/spring/SpringRouteBuilder.html[SpringRouteBuilder] |
| to setup the routes since you will need to setup the spring context with |
| the TransactionTemplates that will define the transaction manager |
| configuration and policies. |
| |
| For inbound endpoint to be transacted, they normally need to be |
| configured to use a Spring PlatformTransactionManager. In the case of |
| the JMS component, this can be done by looking it up in the spring |
| context. |
| |
| You first define needed object in the spring configuration. |
| |
| [source,xml] |
| ---- |
| <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> |
| <property name="connectionFactory" ref="jmsConnectionFactory" /> |
| </bean> |
| |
| <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> |
| <property name="brokerURL" value="tcp://localhost:61616"/> |
| </bean> |
| ---- |
| |
| Then you look them up and use them to create the JmsComponent. |
| |
| [source,java] |
| ---- |
| PlatformTransactionManager transactionManager = (PlatformTransactionManager) spring.getBean("jmsTransactionManager"); |
| ConnectionFactory connectionFactory = (ConnectionFactory) spring.getBean("jmsConnectionFactory"); |
| JmsComponent component = JmsComponent.jmsComponentTransacted(connectionFactory, transactionManager); |
| component.getConfiguration().setConcurrentConsumers(1); |
| ctx.addComponent("activemq", component); |
| ---- |
| |
| [[TransactionalClient-TransactionPolicies]] |
| == Transaction Policies |
| |
| Outbound endpoints will automatically enlist in the current transaction |
| context. But what if you do not want your outbound endpoint to enlist in |
| the same transaction as your inbound endpoint? The solution is to add a |
| Transaction Policy to the processing route. You first have to define |
| transaction policies that you will be using. The policies use a spring |
| TransactionTemplate under the covers for declaring the transaction |
| demarcation to use. So you will need to add something like the following |
| to your spring xml: |
| |
| [source,xml] |
| ---- |
| <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> |
| <property name="transactionManager" ref="jmsTransactionManager"/> |
| </bean> |
| |
| <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> |
| <property name="transactionManager" ref="jmsTransactionManager"/> |
| <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/> |
| </bean> |
| ---- |
| |
| Then in your |
| http://camel.apache.org/maven/current/camel-spring/apidocs/org/apache/camel/spring/SpringRouteBuilder.html[SpringRouteBuilder], |
| you just need to create new SpringTransactionPolicy objects for each of |
| the templates. |
| |
| [source,java] |
| ---- |
| public void configure() { |
| ... |
| Policy required = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRED")); |
| Policy requirenew = bean(SpringTransactionPolicy.class, "PROPAGATION_REQUIRES_NEW")); |
| ... |
| } |
| ---- |
| |
| Once created, you can use the Policy objects in your processing routes: |
| |
| [source,java] |
| ---- |
| // Send to bar in a new transaction |
| from("activemq:queue:foo").policy(requirenew) |
| .to("activemq:queue:bar"); |
| |
| // Send to bar without a transaction. |
| from("activemq:queue:foo").policy(notsupported) |
| .to("activemq:queue:bar"); |
| ---- |
| |
| [[TransactionalClient-OSGiBlueprint]] |
| == OSGi Blueprint |
| |
| If you are using |
| xref:using-osgi-blueprint-with-camel.adoc[OSGi Blueprint] |
| then you most likely have to explicit declare a policy and |
| refer to the policy from the transacted in the route. |
| |
| [source,xml] |
| ---- |
| <bean id="required" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> |
| <property name="transactionManager" ref="jmsTransactionManager"/> |
| <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/> |
| </bean> |
| ---- |
| |
| And then refer to "required" from the route: |
| |
| [source,xml] |
| ---- |
| <route> |
| <from uri="activemq:queue:foo"/> |
| <transacted ref="required"/> |
| <to uri="activemq:queue:bar"/> |
| </route> |
| ---- |
| |
| [[TransactionalClient-DatabaseSample]] |
| == Database Sample |
| |
| In this sample we want to ensure that two endpoints is under transaction |
| control. These two endpoints inserts data into a database. |
| The sample is in its full as a |
| https://github.com/apache/camel/tree/master/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceMinimalConfigurationTest.java[unit test]. |
| |
| First of all we setup the usual spring stuff in its configuration file. |
| Here we have defined a DataSource to the HSQLDB and a most |
| importantly the Spring DataSource TransactionManager that is doing the |
| heavy lifting of ensuring our transactional policies. You are of course |
| free to use any of the Spring based TransactionManager, eg. if you are |
| in a full blown J2EE container you could use JTA or the WebLogic or |
| WebSphere specific managers. |
| |
| As we use the new convention over configuration we do *not* need to |
| configure a transaction policy bean, so we do not have any |
| `PROPAGATION_REQUIRED` beans. All the beans needed to be configured is |
| *standard* Spring beans only, eg. there are no Camel specific |
| configuration at all. |
| [source,xml] |
| ---- |
| <!-- this example uses JDBC so we define a data source --> |
| <jdbc:embedded-database id="dataSource" type="DERBY"> |
| <jdbc:script location="classpath:sql/init.sql" /> |
| </jdbc:embedded-database> |
| |
| <!-- spring transaction manager --> |
| <!-- this is the transaction manager Camel will use for transacted routes --> |
| <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> |
| <property name="dataSource" ref="dataSource"/> |
| </bean> |
| |
| <!-- bean for book business logic --> |
| <bean id="bookService" class="org.apache.camel.spring.interceptor.BookService"> |
| <property name="dataSource" ref="dataSource"/> |
| </bean> |
| ---- |
| |
| Then we are ready to define our Camel routes. We have two routes: 1 for |
| success conditions, and 1 for a forced rollback condition. |
| |
| This is after all based on a unit test. Notice that we mark each route |
| as transacted using the *transacted* tag. |
| |
| [source,xml] |
| ---- |
| <camelContext xmlns="http://camel.apache.org/schema/spring"> |
| <route> |
| <from uri="direct:okay"/> |
| <!-- we mark this route as transacted. Camel will lookup the spring transaction manager |
| and use it by default. We can optimally pass in arguments to specify a policy to use |
| that is configured with a spring transaction manager of choice. However Camel supports |
| convention over configuration as we can just use the defaults out of the box and Camel |
| that suites in most situations --> |
| <transacted/> |
| <setBody> |
| <constant>Tiger in Action</constant> |
| </setBody> |
| <bean ref="bookService"/> |
| <setBody> |
| <constant>Elephant in Action</constant> |
| </setBody> |
| <bean ref="bookService"/> |
| </route> |
| |
| <route> |
| <from uri="direct:fail"/> |
| <!-- we mark this route as transacted. See comments above. --> |
| <transacted/> |
| <setBody> |
| <constant>Tiger in Action</constant> |
| </setBody> |
| <bean ref="bookService"/> |
| <setBody> |
| <constant>Donkey in Action</constant> |
| </setBody> |
| <bean ref="bookService"/> |
| </route> |
| </camelContext> |
| ---- |
| |
| That is all that is needed to configure a Camel route as being transacted. |
| Just remember to use the *transacted* DSL. The rest is standard Spring |
| XML to setup the transaction manager. |
| |
| [[TransactionalClient-JMSSample]] |
| == JMS Sample |
| |
| In this sample we want to listen for messages on a queue and process the |
| messages with our business logic java code and send them along. Since |
| it is based on a |
| https://github.com/apache/camel/tree/master/components/camel-jms/src/test/java/org/apache/camel/component/jms/tx/TransactionMinimalConfigurationTest.java[unit test] |
| the destination is a mock endpoint. |
| |
| First we configure the standard Spring XML to declare a JMS connection |
| factory, a JMS transaction manager and our ActiveMQ component that we |
| use in our routing. |
| |
| [source,xml] |
| ---- |
| <!-- setup JMS connection factory --> |
| <bean id="poolConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" init-method="start" destroy-method="stop"> |
| <property name="maxConnections" value="8"/> |
| <property name="connectionFactory" ref="jmsConnectionFactory"/> |
| </bean> |
| |
| <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> |
| <property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/> |
| </bean> |
| |
| <!-- setup spring jms TX manager --> |
| <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> |
| <property name="connectionFactory" ref="poolConnectionFactory"/> |
| </bean> |
| |
| <!-- define our activemq component --> |
| <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> |
| <property name="connectionFactory" ref="poolConnectionFactory"/> |
| <!-- define the jms consumer/producer as transacted --> |
| <property name="transacted" value="true"/> |
| <!-- setup the transaction manager to use --> |
| <!-- if not provided then Camel will automatic use a JmsTransactionManager, however if you |
| for instance use a JTA transaction manager then you must configure it --> |
| <property name="transactionManager" ref="jmsTransactionManager"/> |
| </bean> |
| ---- |
| |
| And then we configure our routes. Notice that all we have to do is mark the |
| route as transacted using the *transacted* tag. |
| |
| [source,xml] |
| ---- |
| <camelContext xmlns="http://camel.apache.org/schema/spring"> |
| <!-- disable JMX during testing --> |
| <jmxAgent id="agent" disabled="true"/> |
| <route> |
| <!-- 1: from the jms queue --> |
| <from uri="activemq:queue:okay"/> |
| <!-- 2: mark this route as transacted --> |
| <transacted/> |
| <!-- 3: call our business logic that is myProcessor --> |
| <process ref="myProcessor"/> |
| <!-- 4: if success then send it to the mock --> |
| <to uri="mock:result"/> |
| </route> |
| </camelContext> |
| |
| <bean id="myProcessor" class="org.apache.camel.component.jms.tx.JMSTransactionalClientTest$MyProcessor"/> |
| ---- |
| |
| === Transaction error handler |
| |
| When a route is marked as transacted using *transacted* Camel will |
| automatic use the |
| xref:transactionerrorhandler.adoc[TransactionErrorHandler] |
| as xref:error-handler.adoc[Error Handler]. |
| It supports basically the same feature set as the |
| xref:defaulterrorhandler.adoc[DefaultErrorHandler], |
| so you can for instance use |
| xref:exception-clause.adoc[Exception Clause] |
| as well. |
| |
| [[TransactionalClient-IntegrationTestingwithSpring]] |
| == Integration Testing with Spring |
| |
| An Integration Test here means a test runner class annotated |
| `@RunWith(SpringJUnit4ClassRunner.class).` |
| |
| When following the Spring Transactions documentation it is tempting to |
| annotate your integration test with `@Transactional` then seed your |
| database before firing up the route to be tested and sending a message |
| in. This is incorrect as Spring will have an in-progress transaction, |
| and Camel will wait on this before proceeding, leading to the route |
| timing out. |
| |
| Instead, remove the `@Transactional` annotation from the test method and |
| seed the test data within a `TransactionTemplate` execution which will |
| ensure the data is committed to the database before Camel attempts to |
| pick up and use the transaction manager. A simple |
| example https://github.com/rajivj2/example2/blob/master/src/test/java/com/example/NotificationRouterIT.java[can |
| be found on GitHub]. |
| |
| Spring's transactional model ensures each transaction is bound to one |
| thread. A Camel route may invoke additional threads which is where the |
| blockage may occur. This is not a fault of Camel but as the programmer |
| you must be aware of the consequences of beginning a transaction in a |
| test thread and expecting a separate thread created by your Camel route |
| to be participate, which it cannot. You can, in your test, mock the |
| parts that cause separate threads to avoid this issue. |
| |
| [[TransactionalClient-Usingmultiplerouteswithdifferentpropagationbehaviors]] |
| == Using multiple routes with different propagation behaviors |
| |
| *Since Camel 2.2* |
| |
| Suppose you want to route a message through two routes and by which the |
| 2nd route should run in its own transaction. How do you do that? You use |
| propagation behaviors for that where you configure it as follows: |
| |
| * The first route use `PROPAGATION_REQUIRED` |
| * The second route use `PROPAGATION_REQUIRES_NEW` |
| |
| This is configured in the Spring XML file: |
| |
| [source,xml] |
| ---- |
| <bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> |
| <property name="transactionManager" ref="txManager"/> |
| <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED"/> |
| </bean> |
| |
| <bean id="PROPAGATION_REQUIRES_NEW" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> |
| <property name="transactionManager" ref="txManager"/> |
| <property name="propagationBehaviorName" value="PROPAGATION_REQUIRES_NEW"/> |
| </bean> |
| ---- |
| |
| Then in the routes you use transacted DSL to indicate which of these two |
| propagations it uses. |
| |
| [source,java] |
| ---- |
| from("direct:mixed") |
| // using required |
| .transacted("PROPAGATION_REQUIRED") |
| // all these steps will be okay |
| .setBody(constant("Tiger in Action")).bean("bookService") |
| .setBody(constant("Elephant in Action")).bean("bookService") |
| // continue on route 2 |
| .to("direct:mixed2"); |
| |
| from("direct:mixed2") |
| // tell Camel that if this route fails then only rollback this last route |
| // by using (rollback only *last*) |
| .onException(Exception.class).markRollbackOnlyLast().end() |
| // using a different propagation which is requires new |
| .transacted("PROPAGATION_REQUIRES_NEW") |
| // this step will be okay |
| .setBody(constant("Lion in Action")).bean("bookService") |
| // this step will fail with donkey |
| .setBody(constant("Donkey in Action")).bean("bookService"); |
| ---- |
| |
| Notice how we have configured the `onException` in the 2nd route to indicate in |
| case of any exceptions we should handle it and just rollback this |
| transaction. This is done using the `markRollbackOnlyLast` which tells |
| Camel to only do it for the current transaction and not globally. |
| |