JdbcAggregationRepository optimistic locking example
diff --git a/examples/README.adoc b/examples/README.adoc
index 27c6321..faf3ef4 100644
--- a/examples/README.adoc
+++ b/examples/README.adoc
@@ -76,6 +76,8 @@
 
 | link:camel-example-aggregate/README.adoc[Aggregate] (camel-example-aggregate) | EIP | Demonstrates the persistent support for the Camel aggregator
 
+| link:camel-example-aggregate-dist/README.adoc[Aggregate Distributed] (camel-example-aggregate-dist) | EIP | How to use the JdbcAggregationRepository in a distributed environment
+
 | link:camel-example-artemis/README.adoc[Widget Gadget using Apache ActiveMQ Artemis] (camel-example-artemis) | EIP | The widget and gadget example from the EIP book using Apache ActiveMQ Artemis
 
 | link:camel-example-bigxml-split/README.adoc[Bigxml Split] (camel-example-bigxml-split) | EIP | How to deal with big XML files in Camel
diff --git a/examples/camel-example-aggregate-dist/README.adoc b/examples/camel-example-aggregate-dist/README.adoc
new file mode 100644
index 0000000..05ba456
--- /dev/null
+++ b/examples/camel-example-aggregate-dist/README.adoc
@@ -0,0 +1,24 @@
+== Camel Persistent Aggregate
+
+=== Introduction
+
+This example shows how to use Camel JDBC Aggregator optimistic locking feature
+in a distributed environment (multiple independent Camel Contexts).
+
+=== Build and run
+
+You can build and run the example with this simple command:
+
+....
+mvn clean compile exec:java
+....
+
+=== Help and contributions
+
+If you hit any problem using Camel or have some feedback, then please
+https://camel.apache.org/support.html[let us know].
+
+We also love contributors, so
+https://camel.apache.org/contributing.html[get involved] :-)
+
+The Camel riders!
diff --git a/examples/camel-example-aggregate-dist/pom.xml b/examples/camel-example-aggregate-dist/pom.xml
new file mode 100644
index 0000000..6ee787b
--- /dev/null
+++ b/examples/camel-example-aggregate-dist/pom.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel.example</groupId>
+        <artifactId>examples</artifactId>
+        <version>3.4.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-example-aggregate-dist</artifactId>
+    <packaging>jar</packaging>
+    <name>Camel :: Example :: Aggregate Distributed</name>
+    <description>How to use the JdbcAggregationRepository in a distributed environment</description>
+
+    <properties>
+        <category>EIP</category>
+        <camel.osgi.export.pkg>org.apache.camel.example.*</camel.osgi.export.pkg>
+    </properties>
+
+    <dependencyManagement>
+        <dependencies>
+            <!-- Add Camel BOM -->
+            <dependency>
+                <groupId>org.apache.camel</groupId>
+                <artifactId>camel-bom</artifactId>
+                <version>${camel.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-main</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core-engine</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-timer</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-direct</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-spring</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-bean</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-sql</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.derby</groupId>
+            <artifactId>derby</artifactId>
+            <version>${derby-version}</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${log4j2-version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${log4j2-version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <version>${log4j2-version}</version>
+            <scope>runtime</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- mvn clean compile exec:java -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>1.6.0</version>
+                <configuration>
+                    <mainClass>org.apache.camel.example.Application</mainClass>
+                    <includePluginDependencies>false</includePluginDependencies>
+                    <cleanupDaemonThreads>false</cleanupDaemonThreads>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/examples/camel-example-aggregate-dist/src/main/java/org/apache/camel/example/Application.java b/examples/camel-example-aggregate-dist/src/main/java/org/apache/camel/example/Application.java
new file mode 100644
index 0000000..1bcd18c
--- /dev/null
+++ b/examples/camel-example-aggregate-dist/src/main/java/org/apache/camel/example/Application.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.example;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.main.Main;
+import org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository;
+import org.apache.camel.spi.AggregationRepository;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository.OptimisticLockingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.datasource.SingleConnectionDataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class Application {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Application.class);
+
+    protected static final int THREADS = 20;
+    protected static final int END = 100;
+
+    private static final String CID_HEADER = "corrId";
+    private static final String DB_URL = "jdbc:derby:target/testdb;create=true";
+    private static final String DB_USER = "admin";
+    private static final String DB_PASS = "admin";
+
+    private static String CORRELATION_ID, EXPECTED_RESULT;
+    private static Queue<Integer> INPUT_QUEUE;
+    private static CountDownLatch LATCH;
+
+    public static void main(String[] args) throws Exception {
+        // init
+        CORRELATION_ID = UUID.randomUUID().toString();
+        EXPECTED_RESULT = IntStream.rangeClosed(1, END)
+                .mapToObj(Integer::toString).collect(Collectors.joining("."));
+        INPUT_QUEUE = new ConcurrentLinkedQueue<>();
+        IntStream.rangeClosed(1, END).forEach(INPUT_QUEUE::add);
+        LATCH = new CountDownLatch(THREADS);
+
+        // test
+        ExecutorService executor = Executors.newFixedThreadPool(THREADS);
+        for (int i = 0; i < THREADS; i++) {
+            executor.execute(Application::startCamel);
+        }
+
+        // wait
+        LATCH.await();
+        stop(executor);
+    }
+
+    private static void startCamel() {
+        try {
+            Main camel = new Main();
+            camel.configure().addRoutesBuilder(new RouteBuilder() {
+                @Override
+                public void configure() {
+                    from("timer:foo?repeatCount=1&period=1")
+                            .setExchangePattern(ExchangePattern.InOnly)
+                            .bean(new MyProducerBean());
+
+                    from("direct:aggregator")
+                            .filter(body().isNotNull())
+                            .aggregate().header(CID_HEADER)
+                            .aggregationStrategy(Application::aggregationStrategy)
+                            .completionPredicate(Application::completionPredicate)
+                            .aggregationRepository(getAggregationRepository())
+                            .optimisticLocking()
+                            .log(LoggingLevel.INFO, "Result: ${body}");
+                }
+            });
+
+            camel.start();
+            LOG.debug("Camel started");
+            LATCH.await();
+            camel.stop();
+            LOG.debug("Camel stopped");
+        } catch (Exception e) {
+            LOG.error("Failed to start Camel: {}", e.getMessage());
+        }
+    }
+
+    private static AggregationRepository getAggregationRepository() {
+        SingleConnectionDataSource ds = new SingleConnectionDataSource(DB_URL, DB_USER, DB_PASS, true);
+        ds.setAutoCommit(false);
+        try {
+            Connection conn = ds.getConnection();
+            conn.createStatement().execute(
+                    "create table aggregation("
+                            + "id varchar(255) not null primary key,"
+                            + "exchange blob not null,"
+                            + "version bigint not null"
+                            + ")");
+            conn.createStatement().execute(
+                    "create table aggregation_completed("
+                            + "id varchar(255) not null primary key,"
+                            + "exchange blob not null,"
+                            + "version bigint not null"
+                            + ")");
+        } catch (SQLException e) {
+            if (!e.getMessage().contains("already exists")) {
+                LOG.error("Database initialization failure", e);
+            }
+        }
+        DataSourceTransactionManager txManager = new DataSourceTransactionManager(ds);
+        // repositoryName (aggregation) must match tableName (aggregation, aggregation_completed)
+        JdbcAggregationRepository repo = new JdbcAggregationRepository(txManager, "aggregation", ds);
+        repo.setUseRecovery(false);
+        repo.setStoreBodyAsText(false);
+        return (AggregationRepository) repo;
+    }
+
+    private static Exchange aggregationStrategy(Exchange oldExchange, Exchange newExchange) {
+        if (oldExchange == null) {
+            return newExchange;
+        }
+        String body = oldExchange.getIn().getBody(String.class) + "."
+                + newExchange.getIn().getBody(String.class);
+        oldExchange.getIn().setBody(body);
+        LOG.trace("Queue: {}", INPUT_QUEUE);
+        LOG.trace("Aggregation: {}", oldExchange.getIn().getBody());
+        return oldExchange;
+    }
+
+    private static boolean completionPredicate(Exchange exchange) {
+        boolean isComplete = false;
+        final String body = exchange.getIn().getBody(String.class);
+        if (body != null && !body.isEmpty()) {
+            String[] a1 = body.split("\\.");
+            String[] a2 = EXPECTED_RESULT.split("\\.");
+            if (a1.length == a2.length) {
+                Arrays.sort(a1);
+                Arrays.sort(a2);
+                isComplete = Arrays.equals(a1, a2);
+            }
+        }
+        LOG.debug("Complete? {}", isComplete);
+        return isComplete;
+    }
+
+    private static void stop(ExecutorService executor) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(60, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.error("Termination interrupted");
+        } finally {
+            if (executor.isTerminated()) {
+                LOG.debug("All tasks completed");
+            } else {
+                LOG.error("Forcing shutdown of tasks");
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    static class MyProducerBean {
+        public void run(Exchange exchange) throws Exception {
+            CamelContext context = exchange.getContext();
+            ProducerTemplate template = context.createProducerTemplate();
+            template.setThreadedAsyncMode(false);
+            Endpoint endpoint = context.getEndpoint("direct:aggregator");
+            Integer item = null;
+            while ((item = INPUT_QUEUE.poll()) != null) {
+                template.sendBodyAndHeader(endpoint, item, CID_HEADER, CORRELATION_ID);
+            }
+            template.stop();
+            LATCH.countDown();
+        }
+    }
+
+}
diff --git a/examples/camel-example-aggregate-dist/src/main/resources/log4j2.properties b/examples/camel-example-aggregate-dist/src/main/resources/log4j2.properties
new file mode 100644
index 0000000..3efa429
--- /dev/null
+++ b/examples/camel-example-aggregate-dist/src/main/resources/log4j2.properties
@@ -0,0 +1,33 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%15.15t] %highlight{%-5p} %-30.30c - %m%n
+rootLogger.level = INFO
+rootLogger.appenderRef.out.ref = out
+
+loggers = mine,camel,spring,trans
+logger.mine.name = org.apache.camel.example
+logger.mine.level = TRACE
+logger.camel.name = org.apache.camel
+logger.camel.level = WARN
+logger.spring.name = org.springframework
+logger.spring.level = WARN
+logger.trans.name = org.springframework.transaction
+logger.trans.level = WARN
diff --git a/examples/pom.xml b/examples/pom.xml
index d978ee4..b3e2f22 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -83,6 +83,7 @@
     <modules>
         <module>camel-example-activemq-tomcat</module>
         <module>camel-example-aggregate</module>
+        <module>camel-example-aggregate-dist</module>
         <module>camel-example-any23</module>
         <module>camel-example-artemis</module>
         <module>camel-example-artemis-amqp-blueprint</module>
@@ -196,6 +197,7 @@
         <maven-compiler-plugin-version>3.8.1</maven-compiler-plugin-version>
         <maven-surefire-plugin-version>3.0.0-M4</maven-surefire-plugin-version>
         <maven-javadoc-plugin-version>3.0.1</maven-javadoc-plugin-version>
+        <exec-maven-plugin-version>1.6.0</exec-maven-plugin-version>
         <metrics-cdi-version>1.3.3</metrics-cdi-version>
         <mycila-license-version>3.0</mycila-license-version>
         <pax-cdi-version>1.0.0</pax-cdi-version>
@@ -205,6 +207,7 @@
         <tomcat-version>9.0.31</tomcat-version>
         <woodstox-version>6.0.3</woodstox-version>
         <xmlunit-version>1.6</xmlunit-version>
+        <derby-version>10.14.2.0</derby-version>
 
         <!-- for symbolicName in OSGi examples we only want the artifactId, eg camel-example-sql -->
         <!-- as having org.apache.camel as prefix is not needed and makes the name very long -->