| = SQL Component |
| :doctitle: SQL |
| :shortname: sql |
| :artifactid: camel-sql |
| :description: Perform SQL queries using Spring JDBC. |
| :since: 1.4 |
| :supportlevel: Stable |
| :tabs-sync-option: |
| :component-header: Both producer and consumer are supported |
| //Manually maintained attributes |
| :camel-spring-boot-name: sql |
| |
| *Since Camel {since}* |
| |
| *{component-header}* |
| |
| The SQL component allows you to work with databases using JDBC |
| queries. The difference between this component and xref:jdbc-component.adoc[JDBC] |
| component is that in case of SQL the query is a property of the endpoint |
| and it uses message payload as parameters passed to the query. |
| |
| This component uses `spring-jdbc` behind the scenes for the actual SQL |
| handling. |
| |
| Maven users will need to add the following dependency to their `pom.xml` |
| for this component: |
| |
| [source,xml] |
| ---- |
| <dependency> |
| <groupId>org.apache.camel</groupId> |
| <artifactId>camel-sql</artifactId> |
| <version>x.x.x</version> |
| <!-- use the same version as your Camel core version --> |
| </dependency> |
| ---- |
| |
| The SQL component also supports: |
| |
| * a JDBC based repository for the Idempotent Consumer EIP pattern. See further below. |
| * a JDBC based repository for the Aggregator EIP pattern. See further below. |
| |
| == URI format |
| |
| [TIP] |
| ==== |
| This component can be used as a |
| xref:eips:transactional-client.adoc[Transactional Client]. |
| ==== |
| |
| The SQL component uses the following endpoint URI notation: |
| |
| ---- |
| sql:select * from table where id=# order by name[?options] |
| ---- |
| |
| You can use named parameters by using `:#name_of_the_parameter` style as shown: |
| |
| ---- |
| sql:select * from table where id=:#myId order by name[?options] |
| ---- |
| |
| When using named parameters, Camel will lookup the names from, in the |
| given precedence: |
| |
| 1. from a xref:languages:simple-language.adoc[Simple] expressions |
| 2. from message body if its a `java.util.Map` |
| 3. from message headers |
| |
| If a named parameter cannot be resolved, then an exception is thrown. |
| |
| You can use xref:languages:simple-language.adoc[Simple] expressions as parameters as shown: |
| |
| ---- |
| sql:select * from table where id=:#${exchangeProperty.myId} order by name[?options] |
| ---- |
| |
| And the xref:languages:simple-language.adoc[Simple] can also be used with POJO message bodies, |
| to use getters for SQL parameters such as: |
| |
| ---- |
| sql:insert into project (FIRST, LAST, CONTACT_MAIL) |
| values (:#${body.firstName}, :#${body.lastName}, :#${body.email}) |
| ---- |
| |
| TIP: See more details in xref:languages:simple-language.adoc[Simple] for more complex |
| syntax that can be used in the SQL queries, than the small example above. |
| |
| IMPORTANT: Notice that the standard `?` symbol that denotes the parameters to an |
| SQL query is substituted with the `pass:[#]` symbol, because the `?` symbol is |
| used to specify options for the endpoint. The `?` symbol replacement can |
| be configured on endpoint basis. |
| |
| You can externalize your SQL queries to files in the classpath or file system as shown: |
| |
| ---- |
| sql:classpath:sql/myquery.sql[?options] |
| ---- |
| |
| And the `myquery.sql` file is in the classpath and is just a plain text |
| |
| [source,sql] |
| ---- |
| -- this is a comment |
| select * |
| from table |
| where |
| id = :#${exchangeProperty.myId} |
| order by |
| name |
| ---- |
| |
| In the file you can use multilines and format the SQL as you wish. And |
| also use comments such as the – dash line. |
| |
| |
| // component-configure options: START |
| |
| // component-configure options: END |
| |
| // component options: START |
| include::partial$component-configure-options.adoc[] |
| include::partial$component-endpoint-options.adoc[] |
| // component options: END |
| |
| // endpoint options: START |
| |
| // endpoint options: END |
| // component headers: START |
| include::partial$component-endpoint-headers.adoc[] |
| // component headers: END |
| |
| == Treatment of the message body |
| |
| The SQL component tries to convert the message body to an object of |
| `java.util.Iterator` type and then uses this iterator to fill the query |
| parameters (where each query parameter is represented by a `pass:[#]` symbol |
| (or configured placeholder) in the endpoint URI). If the message body is |
| not an array or collection, the conversion results in an iterator that |
| iterates over only one object, which is the body itself. |
| |
| For example, if the message body is an instance of `java.util.List`, the |
| first item in the list is substituted into the first occurrence of `pass:[#]` |
| in the SQL query, the second item in the list is substituted into the |
| second occurrence of `pass:[#]`, and so on. |
| |
| If `batch` is set to `true`, then the interpretation of the inbound |
| message body changes slightly – instead of an iterator of parameters, |
| the component expects an iterator that contains the parameter iterators; |
| the size of the outer iterator determines the batch size. |
| |
| You can use the option `useMessageBodyForSql` that |
| allows to use the message body as the SQL statement, and then the SQL |
| parameters must be provided in a header with the |
| key `SqlConstants.SQL_PARAMETERS`. This allows the SQL component to work |
| more dynamically as the SQL query is from the message body. Use templating |
| (such as xref:components::velocity-component.adoc[Velocity], xref:components::freemarker-component.adoc[Freemarker]) |
| for conditional processing, e.g. to include or exclude `where` clauses |
| depending on the presence of query parameters. |
| |
| == Result of the query |
| |
| For `select` operations, the result is an instance of |
| `List<Map<String, Object>>` type, as returned by the |
| http://static.springframework.org/spring/docs/6.0.x/api/org/springframework/jdbc/core/JdbcTemplate.html#queryForList(java.lang.String,%20java.lang.Object%91%93)[JdbcTemplate.queryForList()] |
| method. For `update` operations, a `NULL` body is returned as the `update` operation is only set as a header and never as a body. |
| |
| By default, the result is placed in the message body. If the |
| outputHeader parameter is set, the result is placed in the header. This |
| is an alternative to using a full message enrichment pattern to add |
| headers, it provides a concise syntax for querying a sequence or some |
| other small value into a header. It is convenient to use outputHeader |
| and outputType together: |
| |
| [source,java] |
| ---- |
| from("jms:order.inbox") |
| .to("sql:select order_seq.nextval from dual?outputHeader=OrderId&outputType=SelectOne") |
| .to("jms:order.booking"); |
| ---- |
| |
| == Using StreamList |
| |
| The producer supports `outputType=StreamList` |
| that uses an iterator to stream the output of the query. This allows to |
| process the data in a streaming fashion which for example can be used by |
| the Splitter EIP to process each row one at a time, |
| and load data from the database as needed. |
| |
| [source,java] |
| ---- |
| from("direct:withSplitModel") |
| .to("sql:select * from projects order by id?outputType=StreamList&outputClass=org.apache.camel.component.sql.ProjectModel") |
| .to("log:stream") |
| .split(body()).streaming() |
| .to("log:row") |
| .to("mock:result") |
| .end(); |
| ---- |
| |
| == Generated keys |
| |
| If you insert data using SQL INSERT, then the RDBMS may support auto |
| generated keys. You can instruct the SQL producer to return the |
| generated keys in headers. To do that set the header `CamelSqlRetrieveGeneratedKeys=true`. |
| Then the generated keys will be provided as headers with the keys listed in the table above. |
| |
| To specify which generated columns should be retrieved, set the header `CamelSqlGeneratedColumns` |
| to a `String[]` or `int[]`, indicating the column names or indexes, respectively. Some databases |
| require this, such as Oracle. It may also be necessary to use the `parametersCount` option if the |
| driver cannot correctly determine the number of parameters. |
| |
| You can see more details in this |
| https://gitbox.apache.org/repos/asf?p=camel.git;a=blob_plain;f=components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlGeneratedKeysTest.java;hb=HEAD[unit test]. |
| |
| == DataSource |
| |
| You can set a reference to a `DataSource` in the URI directly: |
| |
| [source,text] |
| ---- |
| sql:select * from table where id=# order by name?dataSource=#myDS |
| ---- |
| |
| == Using named parameters |
| |
| In the given route below, we want to get all the projects from the |
| projects table. Notice the SQL query has 2 named parameters, `:#lic` and |
| `:#min`. Camel will then look up for these parameters from the message body or |
| message headers. Notice in the example above we set two headers with |
| constant value for the named parameters: |
| |
| [source,java] |
| ---- |
| from("direct:projects") |
| .setHeader("lic", constant("ASF")) |
| .setHeader("min", constant(123)) |
| .to("sql:select * from projects where license = :#lic and id > :#min order by id") |
| ---- |
| |
| Though if the message body is a `java.util.Map` then the named |
| parameters will be taken from the body. |
| |
| [source,java] |
| ---- |
| from("direct:projects") |
| .to("sql:select * from projects where license = :#lic and id > :#min order by id") |
| ---- |
| |
| == Using expression parameters in producers |
| |
| In the given route below, we want to get all the project from the |
| database. It uses the body of the exchange for defining the license and |
| uses the value of a property as the second parameter. |
| |
| [source,java] |
| ---- |
| from("direct:projects") |
| .setBody(constant("ASF")) |
| .setProperty("min", constant(123)) |
| .to("sql:select * from projects where license = :#${body} and id > :#${exchangeProperty.min} order by id") |
| ---- |
| |
| === Using expression parameters in consumers |
| |
| When using the SQL component as consumer, you can now also use expression parameters (simple language) |
| to build dynamic query parameters, such as calling a method on a bean to retrieve an id, date or something. |
| |
| For example in the sample below we call the nextId method on the bean myIdGenerator: |
| |
| [source,java] |
| ---- |
| from("sql:select * from projects where id = :#${bean:myIdGenerator.nextId}") |
| .to("mock:result"); |
| ---- |
| |
| And the bean has the following method: |
| |
| [source,java] |
| ---- |
| public static class MyIdGenerator { |
| |
| private int id = 1; |
| |
| public int nextId() { |
| return id++; |
| } |
| |
| } |
| ---- |
| |
| Notice that there is no existing `Exchange` with message body and headers, so |
| the simple expression you can use in the consumer are most useable for calling |
| bean methods as in this example. |
| |
| == Using IN queries with dynamic values |
| |
| The SQL producer allows to use SQL queries with |
| IN statements where the IN values is dynamic computed. For example from |
| the message body or a header etc. |
| |
| To use IN you need to: |
| |
| * prefix the parameter name with `in:` |
| * add `( )` around the parameter |
| |
| An example explains this better. The following query is used: |
| |
| [source,sql] |
| ---- |
| -- this is a comment |
| select * |
| from projects |
| where project in (:#in:names) |
| order by id |
| ---- |
| |
| In the following route: |
| |
| [source,java] |
| ---- |
| from("direct:query") |
| .to("sql:classpath:sql/selectProjectsIn.sql") |
| .to("log:query") |
| .to("mock:query"); |
| ---- |
| |
| Then the IN query can use a header with the key names with the dynamic |
| values such as: |
| |
| [source,java] |
| ---- |
| // use an array |
| template.requestBodyAndHeader("direct:query", "Hi there!", "names", new String[]{"Camel", "AMQ"}); |
| |
| // use a list |
| List<String> names = new ArrayList<String>(); |
| names.add("Camel"); |
| names.add("AMQ"); |
| |
| template.requestBodyAndHeader("direct:query", "Hi there!", "names", names); |
| |
| // use a string separated values with comma |
| template.requestBodyAndHeader("direct:query", "Hi there!", "names", "Camel,AMQ"); |
| ---- |
| |
| The query can also be specified in the endpoint instead of being |
| externalized (notice that externalizing makes maintaining the SQL |
| queries easier) |
| |
| [source,java] |
| ---- |
| from("direct:query") |
| .to("sql:select * from projects where project in (:#in:names) order by id") |
| .to("log:query") |
| .to("mock:query"); |
| ---- |
| |
| == Using the JDBC based idempotent repository |
| |
| In this section we will use the JDBC based |
| idempotent repository. |
| |
| [TIP] |
| ==== |
| *Abstract class* |
| |
| There is an abstract class |
| `org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository` |
| you can extend to build custom JDBC idempotent repository. |
| ==== |
| |
| First we have to create the database table which will be used by the |
| idempotent repository. We use the following schema: |
| |
| [source,sql] |
| ---- |
| CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), |
| messageId VARCHAR(100), createdAt TIMESTAMP, PRIMARY KEY (processorName, messageId) ) |
| ---- |
| |
| WARNING: The SQL Server *TIMESTAMP* type is a fixed-length binary-string type. It |
| does not map to any of the JDBC time types: *DATE*, *TIME*, or |
| *TIMESTAMP*. |
| |
| The above SQL is consistent with most popular SQL vendors. |
| |
| When working with concurrent consumers it is crucial to create a unique constraint on the column combination of |
| processorName and messageId. This constraint will be prevent multiple consumers adding the same key to the repository |
| and allow only one consumer to handle the message. |
| |
| The SQL above includes the constraint by creating a primary key. If you prefer to use a different |
| constraint, or your SQL server uses a different syntax for table creation, you can create the table yourself using |
| the above schema as a starting point. |
| |
| === Customize the JDBC idempotency repository |
| |
| You have a few options to tune the |
| `org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` for |
| your needs: |
| |
| [cols="1,1,2"] |
| |=== |
| |Parameter |Default Value |Description |
| |
| |createTableIfNotExists |true |Defines whether or not Camel should try to create the table if it |
| doesn't exist. |
| |
| |tableName | CAMEL_MESSAGEPROCESSED | To use a custom table name instead of the default name: CAMEL_MESSAGEPROCESSED. |
| |
| |tableExistsString |SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0 |This query is used to figure out whether the table already exists or |
| not. It must throw an exception to indicate the table doesn't exist. |
| |
| |createString |CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), |
| messageId VARCHAR(100), createdAt TIMESTAMP) |The statement which is used to create the table. |
| |
| |queryString |SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND |
| messageId = ? |The query which is used to figure out whether the message already exists |
| in the repository (the result is not equals to '0'). It takes two |
| parameters. This first one is the processor name (`String`) and the |
| second one is the message id (`String`). |
| |
| |insertString |INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) |
| VALUES (?, ?, ?) |The statement which is used to add the entry into the table. It takes |
| three parameter. The first one is the processor name (`String`), the |
| second one is the message id (`String`) and the third one is the |
| timestamp (`java.sql.Timestamp`) when this entry was added to the |
| repository. |
| |
| |deleteString |DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ? |The statement which is used to delete the entry from the database. |
| It takes two parameter. This first one is the processor name (`String`) and |
| the second one is the message id (`String`). |
| |=== |
| |
| The option `tableName` can be used to use the default SQL queries but with a different table name. |
| However, if you want to customize the SQL queries then you can configure each of them individually. |
| |
| === Orphan Lock aware Jdbc IdempotentRepository |
| |
| One of the limitations of `org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository` is that it does not handle orphan locks resulting from JVM crash or non graceful shutdown. This can result in unprocessed files/messages if this is implementation is used with camel-file, camel-ftp etc. if you need to address orphan locks processing then use |
| `org.apache.camel.processor.idempotent.jdbc.JdbcOrphanLockAwareIdempotentRepository`. This repository keeps track of the locks held by an instance of the application. For each lock held, the application will send keep alive signals to the lock repository resulting in updating the createdAt column with the current Timestamp. When an application instance tries to acquire a lock if the, then there are three possibilities exist : |
| |
| * lock entry does not exist then the lock is provided using the base implementation of `JdbcMessageIdRepository`. |
| |
| * lock already exists and the createdAt < System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that an active instance has the lock and the lock is not provided to the new instance requesting the lock |
| |
| * lock already exists and the createdAt > = System.currentTimeMillis() - lockMaxAgeMillis. In this case it is assumed that there is no active instance which has the lock and the lock is provided to the requesting instance. The reason behind is that if the original instance which had the lock, if it was still running, it would have updated the Timestamp on createdAt using its keepAlive mechanism |
| |
| This repository has two additional configuration parameters |
| |
| [cols="1,1"] |
| |=== |
| |Parameter | Description |
| |lockMaxAgeMillis | This refers to the duration after which the lock is considered orphaned i.e. if the currentTimestamp - createdAt >= lockMaxAgeMillis then lock is orphaned. |
| |lockKeepAliveIntervalMillis | The frequency at which keep alive updates are done to createdAt Timestamp column. |
| |=== |
| |
| === Caching Jdbc IdempotentRepository |
| |
| Some SQL implementations are not fast on a per-query basis. The |
| `JdbcMessageIdRepository` implementation does its idempotent checks |
| individually within SQL transactions. Checking a mere 100 keys can |
| take minutes. The `JdbcCachedMessageIdRepository` preloads an in-memory |
| cache on start with the entire list of keys. This cache is then |
| checked first before passing through to the original implementation. |
| |
| As with all cache implementations, there are considerations that should |
| be made with regard to stale data and your specific usage. |
| |
| == Using the JDBC based aggregation repository |
| |
| `JdbcAggregationRepository` is an `AggregationRepository` which on the |
| fly persists the aggregated messages. This ensures that you will not |
| lose messages, as the default aggregator will use an in memory only |
| `AggregationRepository`. The `JdbcAggregationRepository` allows together with Camel to provide |
| persistent support for the Aggregator. |
| |
| Only when an Exchange has been successfully |
| processed it will be marked as complete which happens when the `confirm` |
| method is invoked on the `AggregationRepository`. This means if the same |
| Exchange fails again it will be kept retried until success. |
| |
| You can use option `maximumRedeliveries` to limit the maximum number of |
| redelivery attempts for a given recovered Exchange. |
| You must also set the `deadLetterUri` option so Camel knows where to |
| send the Exchange when the `maximumRedeliveries` was hit. |
| |
| You can see some examples in the unit tests of camel-sql, for example `JdbcAggregateRecoverDeadLetterChannelTest.java` |
| |
| === Database |
| |
| To be operational, each aggregator uses two table: the aggregation and |
| completed one. By convention the completed has the same name as the |
| aggregation one suffixed with `"_COMPLETED"`. The name must be |
| configured in the Spring bean with the `RepositoryName` property. In the |
| following example aggregation will be used. |
| |
| The table structure definition of both table are identical: in both case |
| a String value is used as key (*id*) whereas a Blob contains the |
| exchange serialized in byte array. |
| However, one difference should be remembered: the *id* field does not |
| have the same content depending on the table. |
| In the aggregation table *id* holds the correlation id used by the |
| component to aggregate the messages. In the completed table, *id* holds |
| the id of the exchange stored in corresponding the blob field. |
| |
| Here is the SQL query used to create the tables, just replace |
| `"aggregation"` with your aggregator repository name. |
| |
| [source,sql] |
| ----- |
| CREATE TABLE aggregation ( |
| id varchar(255) NOT NULL, |
| exchange blob NOT NULL, |
| version BIGINT NOT NULL, |
| constraint aggregation_pk PRIMARY KEY (id) |
| ); |
| CREATE TABLE aggregation_completed ( |
| id varchar(255) NOT NULL, |
| exchange blob NOT NULL, |
| version BIGINT NOT NULL, |
| constraint aggregation_completed_pk PRIMARY KEY (id) |
| ); |
| ----- |
| |
| |
| == Storing body and headers as text |
| |
| You can configure the `JdbcAggregationRepository` to store message body |
| and select(ed) headers as String in separate columns. For example to |
| store the body, and the following two headers `companyName` and |
| `accountName` use the following SQL: |
| |
| [source,sql] |
| ---- |
| CREATE TABLE aggregationRepo3 ( |
| id varchar(255) NOT NULL, |
| exchange blob NOT NULL, |
| version BIGINT NOT NULL, |
| body varchar(1000), |
| companyName varchar(1000), |
| accountName varchar(1000), |
| constraint aggregationRepo3_pk PRIMARY KEY (id) |
| ); |
| CREATE TABLE aggregationRepo3_completed ( |
| id varchar(255) NOT NULL, |
| exchange blob NOT NULL, |
| version BIGINT NOT NULL, |
| body varchar(1000), |
| companyName varchar(1000), |
| accountName varchar(1000), |
| constraint aggregationRepo3_completed_pk PRIMARY KEY (id) |
| ); |
| ---- |
| |
| And then configure the repository to enable this behavior as shown below: |
| |
| [source,xml] |
| ---- |
| <bean id="repo3" |
| class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> |
| <property name="repositoryName" value="aggregationRepo3"/> |
| <property name="transactionManager" ref="txManager3"/> |
| <property name="dataSource" ref="dataSource3"/> |
| <!-- configure to store the message body and following headers as text in the repo --> |
| <property name="storeBodyAsText" value="true"/> |
| <property name="headersToStoreAsText"> |
| <list> |
| <value>companyName</value> |
| <value>accountName</value> |
| </list> |
| </property> |
| </bean> |
| ---- |
| |
| === Codec (Serialization) |
| |
| Since they can contain any type of payload, Exchanges are not |
| serializable by design. It is converted into a byte array to be stored |
| in a database BLOB field. All those conversions are handled by the |
| `JdbcCodec` class. One detail of the code requires your attention: the |
| `ClassLoadingAwareObjectInputStream`. |
| |
| The `ClassLoadingAwareObjectInputStream` has been reused from the |
| http://activemq.apache.org/[Apache ActiveMQ] project. It wraps an |
| `ObjectInputStream` and use it with the `ContextClassLoader` rather than |
| the `currentThread` one. The benefit is to be able to load classes |
| exposed by other bundles. This allows the exchange body and headers to |
| have custom types object references. |
| |
| === Transaction |
| |
| A Spring `PlatformTransactionManager` is required to orchestrate |
| transaction. |
| |
| ==== Service (Start/Stop) |
| |
| The `start` method verify the connection of the database and the |
| presence of the required tables. If anything is wrong it will fail |
| during starting. |
| |
| === Aggregator configuration |
| |
| Depending on the targeted environment, the aggregator might need some |
| configuration. As you already know, each aggregator should have its own |
| repository (with the corresponding pair of table created in the |
| database) and a data source. If the default lobHandler is not adapted to |
| your database system, it can be injected with the `lobHandler` property. |
| |
| Here is the declaration for Oracle: |
| |
| [source,xml] |
| ---- |
| <bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> |
| <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> |
| </bean> |
| <bean id="nativeJdbcExtractor" |
| class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> |
| <bean id="repo" |
| class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> |
| <property name="transactionManager" ref="transactionManager"/> |
| <property name="repositoryName" value="aggregation"/> |
| <property name="dataSource" ref="dataSource"/> |
| <!-- Only with Oracle, else use default --> |
| <property name="lobHandler" ref="lobHandler"/> |
| </bean> |
| ---- |
| |
| === Optimistic locking |
| |
| You can turn on `optimisticLocking` and use |
| this JDBC based aggregation repository in a clustered environment where |
| multiple Camel applications shared the same database for the aggregation |
| repository. If there is a race condition there JDBC driver will throw a |
| vendor specific exception which the `JdbcAggregationRepository` can |
| react upon. To know which caused exceptions from the JDBC driver is |
| regarded as an optimistic locking error we need a mapper to do this. |
| Therefore there is a |
| `org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper` |
| allows you to implement your custom logic if needed. There is a default |
| implementation |
| `org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper` |
| which works as follows: |
| |
| The following check is done: |
| |
| * If the caused exception is an `SQLException` then the SQLState is |
| checked if starts with 23. |
| * If the caused exception is a `DataIntegrityViolationException` |
| * If the caused exception class name has "ConstraintViolation" in its |
| name. |
| * Optional checking for FQN class name matches if any class names has been |
| configured. |
| |
| You can in addition add FQN classnames, and if any of the caused |
| exception (or any nested) equals any of the FQN class names, then its an |
| optimistic locking error. |
| |
| Here is an example, where we define 2 extra FQN class names from the |
| JDBC vendor. |
| |
| [source,xml] |
| ---- |
| <bean id="repo" |
| class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> |
| <property name="transactionManager" ref="transactionManager"/> |
| <property name="repositoryName" value="aggregation"/> |
| <property name="dataSource" ref="dataSource"/> |
| <property name="jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> |
| </bean> |
| <!-- use the default mapper with extraFQN class names from our JDBC driver --> |
| <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> |
| <property name="classNames"> |
| <util:set> |
| <value>com.foo.sql.MyViolationExceptoion</value> |
| <value>com.foo.sql.MyOtherViolationExceptoion</value> |
| </util:set> |
| </property> |
| </bean> |
| ---- |
| |
| === Propagation behavior |
| |
| `JdbcAggregationRepository` uses two distinct _transaction templates_ from Spring-TX. One is read-only |
| and one is used for read-write operations. |
| |
| However, when using `JdbcAggregationRepository` within a route that itself uses `<transacted />` and there's |
| common `PlatformTransactionManager` used, there may be a need to configure _propagation behavior_ used by |
| transaction templates inside `JdbcAggregationRepository`. |
| |
| Here's a way to do it: |
| [source,xml] |
| ---- |
| <bean id="repo" |
| class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> |
| <property name="propagationBehaviorName" value="PROPAGATION_NESTED" /> |
| </bean> |
| ---- |
| |
| Propagation is specified by constants of `org.springframework.transaction.TransactionDefinition` interface, |
| so `propagationBehaviorName` is convenient setter that allows to use names of the constants. |
| |
| === Clustering |
| |
| JdbcAggregationRepository does not provide recovery in a clustered environment. |
| |
| You may use ClusteredJdbcAggregationRepository that provides a limited support for recovery in a clustered environment : recovery mechanism is dealt separately by members of the cluster, i.e. a member may only recover exchanges that it completed itself. |
| |
| To enable this behaviour, property `recoverByInstance` must be set to true, and `instanceId` property must be defined using a unique identifier (a string) for each member of the cluster. |
| |
| Besides, completed table must have a `instance_id VARCHAR(255)` column. |
| |
| NOTE: Since each member is the only responsible for the recovery of its completed exchanges, if a member is stopped, its completed exchanges will not be recovered until it is restarted, unless you update completed table to affect them to another member (by changing `instance_id` for those completed exchanges). |
| |
| === PostgreSQL case |
| |
| There's special database that may cause problems with optimistic locking used by `JdbcAggregationRepository`. |
| PostgreSQL marks connection as invalid in case of data integrity violation exception (the one with SQLState 23505). |
| This makes the connection effectively unusable within nested transaction. |
| Details can be found |
| https://www.postgresql.org/message-id/200609241203.59292.ralf.wiebicke%40exedio.com[in this document]. |
| |
| `org.apache.camel.processor.aggregate.jdbc.PostgresAggregationRepository` extends `JdbcAggregationRepository` and |
| uses special `INSERT .. ON CONFLICT ..` statement to provide optimistic locking behavior. |
| |
| This statement is (with default aggregation table definition): |
| [source,sql] |
| ---- |
| INSERT INTO aggregation (id, exchange) values (?, ?) ON CONFLICT DO NOTHING |
| ---- |
| |
| Details can be found https://www.postgresql.org/docs/9.5/sql-insert.html[in PostgreSQL documentation]. |
| |
| When this clause is used, `java.sql.PreparedStatement.executeUpdate()` call returns `0` instead of throwing |
| SQLException with SQLState=23505. Further handling is exactly the same as with generic `JdbcAggregationRepository`, |
| but without marking PostgreSQL connection as invalid. |
| |
| == Camel Sql Starter |
| |
| A starter module is available to spring-boot users. When using the starter, |
| the `DataSource` can be directly configured using spring-boot properties. |
| |
| [source,properties] |
| ---- |
| // Example for a mysql datasource |
| spring.datasource.url=jdbc:mysql://localhost/test |
| spring.datasource.username=dbuser |
| spring.datasource.password=dbpass |
| spring.datasource.driver-class-name=com.mysql.jdbc.Driver |
| ---- |
| |
| To use this feature, add the following dependencies to your spring boot pom.xml file: |
| |
| [source,xml] |
| ---- |
| <dependency> |
| <groupId>org.apache.camel.springboot</groupId> |
| <artifactId>camel-sql-starter</artifactId> |
| <version>${camel.version}</version> <!-- use the same version as your Camel core version --> |
| </dependency> |
| |
| <dependency> |
| <groupId>org.springframework.boot</groupId> |
| <artifactId>spring-boot-starter-jdbc</artifactId> |
| <version>${spring-boot-version}</version> |
| </dependency> |
| ---- |
| |
| You should also include the specific database driver, if needed. |
| |
| |
| |
| include::spring-boot:partial$starter.adoc[] |