Merge pull request #7 from fvaleri/opt-lock-example
JdbcAggregationRepository optimistic locking example
diff --git a/examples/README.adoc b/examples/README.adoc
index 27c6321..faf3ef4 100644
--- a/examples/README.adoc
+++ b/examples/README.adoc
@@ -76,6 +76,8 @@
| link:camel-example-aggregate/README.adoc[Aggregate] (camel-example-aggregate) | EIP | Demonstrates the persistent support for the Camel aggregator
+| link:camel-example-aggregate-dist/README.adoc[Aggregate Distributed] (camel-example-aggregate-dist) | EIP | How to use the JdbcAggregationRepository in a distributed environment
+
| link:camel-example-artemis/README.adoc[Widget Gadget using Apache ActiveMQ Artemis] (camel-example-artemis) | EIP | The widget and gadget example from the EIP book using Apache ActiveMQ Artemis
| link:camel-example-bigxml-split/README.adoc[Bigxml Split] (camel-example-bigxml-split) | EIP | How to deal with big XML files in Camel
diff --git a/examples/camel-example-aggregate-dist/README.adoc b/examples/camel-example-aggregate-dist/README.adoc
new file mode 100644
index 0000000..05ba456
--- /dev/null
+++ b/examples/camel-example-aggregate-dist/README.adoc
@@ -0,0 +1,24 @@
+== Camel Persistent Aggregate
+
+=== Introduction
+
+This example shows how to use Camel JDBC Aggregator optimistic locking feature
+in a distributed environment (multiple independent Camel Contexts).
+
+=== Build and run
+
+You can build and run the example with this simple command:
+
+....
+mvn clean compile exec:java
+....
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/support.html[let us know].
+
+We also love contributors, so
+https://camel.apache.org/contributing.html[get involved] :-)
+
+The Camel riders!
diff --git a/examples/camel-example-aggregate-dist/pom.xml b/examples/camel-example-aggregate-dist/pom.xml
new file mode 100644
index 0000000..6ee787b
--- /dev/null
+++ b/examples/camel-example-aggregate-dist/pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.camel.example</groupId>
+ <artifactId>examples</artifactId>
+ <version>3.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-example-aggregate-dist</artifactId>
+ <packaging>jar</packaging>
+ <name>Camel :: Example :: Aggregate Distributed</name>
+ <description>How to use the JdbcAggregationRepository in a distributed environment</description>
+
+ <properties>
+ <category>EIP</category>
+ <camel.osgi.export.pkg>org.apache.camel.example.*</camel.osgi.export.pkg>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <!-- Add Camel BOM -->
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-bom</artifactId>
+ <version>${camel.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-main</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-timer</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-direct</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-spring</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-bean</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-sql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>${derby-version}</version>
+ </dependency>
+
+ <!-- logging -->
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
+ <version>${log4j2-version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>${log4j2-version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2-version}</version>
+ <scope>runtime</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- mvn clean compile exec:java -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.6.0</version>
+ <configuration>
+ <mainClass>org.apache.camel.example.Application</mainClass>
+ <includePluginDependencies>false</includePluginDependencies>
+ <cleanupDaemonThreads>false</cleanupDaemonThreads>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/examples/camel-example-aggregate-dist/src/main/java/org/apache/camel/example/Application.java b/examples/camel-example-aggregate-dist/src/main/java/org/apache/camel/example/Application.java
new file mode 100644
index 0000000..1bcd18c
--- /dev/null
+++ b/examples/camel-example-aggregate-dist/src/main/java/org/apache/camel/example/Application.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.main.Main;
+import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository;
+import org.apache.camel.spi.AggregationRepository;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.datasource.SingleConnectionDataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class Application {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Application.class);
+
+ protected static final int THREADS = 20;
+ protected static final int END = 100;
+
+ private static final String CID_HEADER = "corrId";
+ private static final String DB_URL = "jdbc:derby:target/testdb;create=true";
+ private static final String DB_USER = "admin";
+ private static final String DB_PASS = "admin";
+
+ private static String CORRELATION_ID, EXPECTED_RESULT;
+ private static Queue<Integer> INPUT_QUEUE;
+ private static CountDownLatch LATCH;
+
+ public static void main(String[] args) throws Exception {
+ // init
+ CORRELATION_ID = UUID.randomUUID().toString();
+ EXPECTED_RESULT = IntStream.rangeClosed(1, END)
+ .mapToObj(Integer::toString).collect(Collectors.joining("."));
+ INPUT_QUEUE = new ConcurrentLinkedQueue<>();
+ IntStream.rangeClosed(1, END).forEach(INPUT_QUEUE::add);
+ LATCH = new CountDownLatch(THREADS);
+
+ // test
+ ExecutorService executor = Executors.newFixedThreadPool(THREADS);
+ for (int i = 0; i < THREADS; i++) {
+ executor.execute(Application::startCamel);
+ }
+
+ // wait
+ LATCH.await();
+ stop(executor);
+ }
+
+ private static void startCamel() {
+ try {
+ Main camel = new Main();
+ camel.configure().addRoutesBuilder(new RouteBuilder() {
+ @Override
+ public void configure() {
+ from("timer:foo?repeatCount=1&period=1")
+ .setExchangePattern(ExchangePattern.InOnly)
+ .bean(new MyProducerBean());
+
+ from("direct:aggregator")
+ .filter(body().isNotNull())
+ .aggregate().header(CID_HEADER)
+ .aggregationStrategy(Application::aggregationStrategy)
+ .completionPredicate(Application::completionPredicate)
+ .aggregationRepository(getAggregationRepository())
+ .optimisticLocking()
+ .log(LoggingLevel.INFO, "Result: ${body}");
+ }
+ });
+
+ camel.start();
+ LOG.debug("Camel started");
+ LATCH.await();
+ camel.stop();
+ LOG.debug("Camel stopped");
+ } catch (Exception e) {
+ LOG.error("Failed to start Camel: {}", e.getMessage());
+ }
+ }
+
+ private static AggregationRepository getAggregationRepository() {
+ SingleConnectionDataSource ds = new SingleConnectionDataSource(DB_URL, DB_USER, DB_PASS, true);
+ ds.setAutoCommit(false);
+ try {
+ Connection conn = ds.getConnection();
+ conn.createStatement().execute(
+ "create table aggregation("
+ + "id varchar(255) not null primary key,"
+ + "exchange blob not null,"
+ + "version bigint not null"
+ + ")");
+ conn.createStatement().execute(
+ "create table aggregation_completed("
+ + "id varchar(255) not null primary key,"
+ + "exchange blob not null,"
+ + "version bigint not null"
+ + ")");
+ } catch (SQLException e) {
+ if (!e.getMessage().contains("already exists")) {
+ LOG.error("Database initialization failure", e);
+ }
+ }
+ DataSourceTransactionManager txManager = new DataSourceTransactionManager(ds);
+ // repositoryName (aggregation) must match tableName (aggregation, aggregation_completed)
+ JdbcAggregationRepository repo = new JdbcAggregationRepository(txManager, "aggregation", ds);
+ repo.setUseRecovery(false);
+ repo.setStoreBodyAsText(false);
+ return (AggregationRepository) repo;
+ }
+
+ private static Exchange aggregationStrategy(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+ String body = oldExchange.getIn().getBody(String.class) + "."
+ + newExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(body);
+ LOG.trace("Queue: {}", INPUT_QUEUE);
+ LOG.trace("Aggregation: {}", oldExchange.getIn().getBody());
+ return oldExchange;
+ }
+
+ private static boolean completionPredicate(Exchange exchange) {
+ boolean isComplete = false;
+ final String body = exchange.getIn().getBody(String.class);
+ if (body != null && !body.isEmpty()) {
+ String[] a1 = body.split("\\.");
+ String[] a2 = EXPECTED_RESULT.split("\\.");
+ if (a1.length == a2.length) {
+ Arrays.sort(a1);
+ Arrays.sort(a2);
+ isComplete = Arrays.equals(a1, a2);
+ }
+ }
+ LOG.debug("Complete? {}", isComplete);
+ return isComplete;
+ }
+
+ private static void stop(ExecutorService executor) {
+ try {
+ executor.shutdown();
+ executor.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Termination interrupted");
+ } finally {
+ if (executor.isTerminated()) {
+ LOG.debug("All tasks completed");
+ } else {
+ LOG.error("Forcing shutdown of tasks");
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ static class MyProducerBean {
+ public void run(Exchange exchange) throws Exception {
+ CamelContext context = exchange.getContext();
+ ProducerTemplate template = context.createProducerTemplate();
+ template.setThreadedAsyncMode(false);
+ Endpoint endpoint = context.getEndpoint("direct:aggregator");
+ Integer item = null;
+ while ((item = INPUT_QUEUE.poll()) != null) {
+ template.sendBodyAndHeader(endpoint, item, CID_HEADER, CORRELATION_ID);
+ }
+ template.stop();
+ LATCH.countDown();
+ }
+ }
+
+}
diff --git a/examples/camel-example-aggregate-dist/src/main/resources/log4j2.properties b/examples/camel-example-aggregate-dist/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..3efa429
--- /dev/null
+++ b/examples/camel-example-aggregate-dist/src/main/resources/log4j2.properties
@@ -0,0 +1,33 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%15.15t] %highlight{%-5p} %-30.30c - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.out.ref = out
+
+loggers = mine,camel,spring,trans
+logger.mine.name = org.apache.camel.example
+logger.mine.level = TRACE
+logger.camel.name = org.apache.camel
+logger.camel.level = WARN
+logger.spring.name = org.springframework
+logger.spring.level = WARN
+logger.trans.name = org.springframework.transaction
+logger.trans.level = WARN
diff --git a/examples/pom.xml b/examples/pom.xml
index d978ee4..b3e2f22 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -83,6 +83,7 @@
<modules>
<module>camel-example-activemq-tomcat</module>
<module>camel-example-aggregate</module>
+ <module>camel-example-aggregate-dist</module>
<module>camel-example-any23</module>
<module>camel-example-artemis</module>
<module>camel-example-artemis-amqp-blueprint</module>
@@ -196,6 +197,7 @@
<maven-compiler-plugin-version>3.8.1</maven-compiler-plugin-version>
<maven-surefire-plugin-version>3.0.0-M4</maven-surefire-plugin-version>
<maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version>
+ <exec-maven-plugin-version>1.6.0</exec-maven-plugin-version>
<metrics-cdi-version>1.3.3</metrics-cdi-version>
<mycila-license-version>3.0</mycila-license-version>
<pax-cdi-version>1.0.0</pax-cdi-version>
@@ -205,6 +207,7 @@
<tomcat-version>9.0.31</tomcat-version>
<woodstox-version>6.0.3</woodstox-version>
<xmlunit-version>1.6</xmlunit-version>
+ <derby-version>10.14.2.0</derby-version>
<!-- for symbolicName in OSGi examples we only want the artifactId, eg camel-example-sql -->
<!-- as having org.apache.camel as prefix is not needed and makes the name very long -->