Merge pull request #216 from jbonofre/KARAF-6736

[KARAF-6736] Add Camel processor
diff --git a/assembly/pom.xml b/assembly/pom.xml
index d51e8bd..96c53ce 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -91,6 +91,7 @@
                                 <descriptor>mvn:org.apache.karaf.features/framework/${karaf.version}/xml/features</descriptor>
                                 <descriptor>mvn:org.apache.karaf.features/standard/${karaf.version}/xml/features</descriptor>
                                 <descriptor>mvn:org.apache.karaf.features/enterprise/${karaf.version}/xml/features</descriptor>
+                                <descriptor>mvn:org.apache.camel.karaf/apache-camel/${camel.version}/xml/features</descriptor>
                             </descriptors>
                             <distribution>org.apache.karaf.features:framework</distribution>
                             <javase>1.8</javase>
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index 85002d9..f8fe563 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -18,8 +18,6 @@
 -->
 <features name="karaf-decanter-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.4.0 http://karaf.apache.org/xmlns/features/v1.4.0">
 
-    <repository>mvn:org.apache.camel.karaf/apache-camel/${camel.version}/xml/features</repository>
-
     <feature name="decanter-common" version="${project.version}" description="Karaf Decanter API">
         <config name="org.apache.felix.eventadmin.impl.EventAdmin" append="true">
 org.apache.felix.eventadmin.ThreadPoolSize=40
@@ -299,6 +297,7 @@
         <feature>decanter-common</feature>
         <feature>camel-core</feature>
         <bundle>mvn:org.apache.karaf.decanter.appender/org.apache.karaf.decanter.appender.camel/${project.version}</bundle>
+        <bundle>mvn:org.apache.karaf.decanter.appender/org.apache.karaf.decanter.appender.camel/${project.version}</bundle>
     </feature>
     
     <feature name="decanter-appender-camel" version="${project.version}" description="Karaf Decanter Camel Appender">
@@ -482,6 +481,18 @@
         <feature>decanter-processor-groupby-core</feature>
     </feature>
 
+    <feature name="decanter-processor-camel-core" version="${project.version}" description="Karaf Decanter Camel Processor core">
+        <feature>decanter-common</feature>
+        <feature>camel-core</feature>
+        <bundle dependency="true">mvn:org.apache.camel/camel-core-osgi/${camel.version}</bundle>
+        <bundle>mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.camel/${project.version}</bundle>
+    </feature>
+
+    <feature name="decanter-processor-camel" version="${project.version}" description="Karaf Decanter Camel Processor">
+        <configfile finalname="/etc/org.apache.karaf.decanter.processor.camel.cfg">mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.camel/${project.version}/cfg</configfile>
+        <feature>decanter-processor-camel-core</feature>
+    </feature>
+
     <feature name="decanter-alerting-core" version="${project.version}" description="Karaf Decanter Alerting core">
         <feature>decanter-common</feature>
         <bundle>mvn:org.apache.karaf.decanter.alerting/org.apache.karaf.decanter.alerting.service/${project.version}</bundle>
diff --git a/itest/src/test/java/org/apache/karaf/decanter/itests/processor/CamelProcessorTest.java b/itest/src/test/java/org/apache/karaf/decanter/itests/processor/CamelProcessorTest.java
new file mode 100644
index 0000000..62bbc2d
--- /dev/null
+++ b/itest/src/test/java/org/apache/karaf/decanter/itests/processor/CamelProcessorTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.itests.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.karaf.itests.KarafTestSupport;
+import org.apache.karaf.jaas.boot.principal.RolePrincipal;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.karaf.options.KarafDistributionOption;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.util.*;
+import java.util.stream.Stream;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class CamelProcessorTest extends KarafTestSupport {
+
+    @Configuration
+    public Option[] config() {
+        Option[] options = new Option[]{
+                KarafDistributionOption.editConfigurationFilePut("etc/system.properties", "decanter.version", System.getProperty("decanter.version")),
+                KarafDistributionOption.features("mvn:org.apache.camel.karaf/apache-camel/" + System.getProperty("camel.version") + "/xml/features", "camel-core")
+        };
+        return Stream.of(super.config(), options).flatMap(Stream::of).toArray(Option[]::new);
+    }
+
+    @Test
+    public void test() throws Exception {
+        System.out.println("Installing Decanter Processor Camel ...");
+        System.out.println(executeCommand("feature:repo-add decanter " + System.getProperty("decanter.version"), new RolePrincipal("admin")));
+        System.out.println(executeCommand("feature:install decanter-processor-camel", new RolePrincipal("admin")));
+        String configList = executeCommand("config:list '(service.pid=org.apache.karaf.decanter.processor.camel)'");
+        while (!configList.contains("service.pid")) {
+            Thread.sleep(500);
+            configList = executeCommand("config:list '(service.pid=org.apache.karaf.decanter.processor.camel)'");
+        }
+
+        System.out.println("Creating test Camel route ...");
+        DefaultCamelContext camelContext = new DefaultCamelContext();
+        camelContext.setName("decanter-test-context");
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct-vm:decanter-delegate")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                Map<String, Object> body = exchange.getIn().getBody(Map.class);
+                                body.put("camel-processing", "of-course");
+                                exchange.getIn().setBody(body, Map.class);
+                            }
+                        }).to("direct-vm:decanter-callback");
+            }
+        });
+        camelContext.start();
+        while (!camelContext.isStarted()) {
+            Thread.sleep(200);
+        }
+
+        System.out.println("Adding event handler ...");
+        final List<Event> received = new ArrayList<>();
+        EventHandler eventHandler = new EventHandler() {
+            @Override
+            public void handleEvent(Event event) {
+                received.add(event);
+            }
+        };
+        Hashtable<String, Object> serviceProperties = new Hashtable<>();
+        serviceProperties.put(EventConstants.EVENT_TOPIC, "decanter/process/*");
+        bundleContext.registerService(EventHandler.class, eventHandler, serviceProperties);
+
+        System.out.println("Sending test events ...");
+        EventAdmin dispatcher = getOsgiService(EventAdmin.class);
+        Map<String, Object> data = new HashMap<>();
+        data.put("foo", "bar");
+        dispatcher.sendEvent(new Event("decanter/collect/test", data));
+
+        System.out.println("Waiting events ...");
+        while (received.size() < 1) {
+            Thread.sleep(500);
+        }
+
+        System.out.println("");
+
+        for (int i = 0; i < received.size(); i++) {
+            for (String property : received.get(i).getPropertyNames()) {
+                System.out.println(property + " = " + received.get(i).getProperty(property));
+            }
+            System.out.println("========");
+        }
+
+        System.out.println("");
+
+        Assert.assertEquals(1, received.size());
+
+        Assert.assertEquals("of-course", received.get(0).getProperty("camel-processing"));
+        Assert.assertEquals("camel", received.get(0).getProperty("processor"));
+        Assert.assertEquals("bar", received.get(0).getProperty("foo"));
+    }
+
+}
diff --git a/manual/src/main/asciidoc/user-guide/processors.adoc b/manual/src/main/asciidoc/user-guide/processors.adoc
index 7696ee0..55dc125 100644
--- a/manual/src/main/asciidoc/user-guide/processors.adoc
+++ b/manual/src/main/asciidoc/user-guide/processors.adoc
@@ -107,11 +107,6 @@
 ----
 #
 # Decanter GroupBy processor
-#
-
-#
-# Destination dispatcher topics where to send the aggregated events
-#
 #target.topics=decanter/process/groupby
 
 #
@@ -134,4 +129,54 @@
 * The `target.topics` property defines the list of Decanter topics (separated by `,`) where the resulting events will be sent.
 * The `period` property defines the retention period to accumulate the incoming events
 * The `groupBy` property defines the property names (separated by `,`) as grouping term
-* The `flatten` property defines the way the resulting event will be created. If `true`, all events properties will be store directly (flat) in the resulting event. If `false`, the resulting event will contain an array of properties (from the original grouped events).
\ No newline at end of file
+* The `flatten` property defines the way the resulting event will be created. If `true`, all events properties will be store directly (flat) in the resulting event. If `false`, the resulting event will contain an array of properties (from the original grouped events).
+
+==== Apache Camel
+
+It's also possible you implement your own event processor using Apache Camel.
+
+Decanter Camel Processor delegates event processing to your Camel route. Your route just has to callback Decanter (on a dedicated Camel endpoint)
+to send the processed event back in the dispatcher.
+
+By default, Decanter Camel processor send the events to `direct-vm:decanter-delegate` endpoint, and
+expects the processed event back on `direct-vm:decanter-callback`.
+
+The Camel message body is `Map<String,Object>` (it's what Decanter is sending into your Camel route and expects on
+the callback endpoint).
+
+You can install the Camel processor with the `decanter-processor-camel` feature:
+
+----
+karaf@root()> feature:install decanter-processor-camel
+----
+
+This feature also installs `etc/org.apache.karaf.decanter.processor.camel.cfg` configuration file:
+
+----
+#
+# Decanter Camel processor
+#
+
+#
+# Destination dispatcher topics where to send the aggregated events
+#
+#target.topics=decanter/process/camel
+
+#
+# This is the Camel endpoint URI where Decanter is sending the events
+# (using event Map<String, Object> as body)
+#
+#delegate.uri=direct-vm:decanter-delegate
+
+#
+# This is the Camel endpoint URI where user Camel route should call to be "back" in Decanter
+# The user Camel route is supposed to do "to uri=[CALLBACK]" with a Map<String, Object> body
+# resulting of the route processing.
+# Decanter uses this body to send a new Event to the dispatcher target topics.
+#
+#callback.uri=direct-vm:decanter-callback
+----
+
+* the `target.topics` property is the list of Decanter dispatcher topics (separated by `,`) where the processor will "forward" the processed events.
+* the `delegate.uri` property is the Camel endpoint URI where Decanter Camel Processor will send events (as `Map<String,Object>`). It's basically the `from` endpoint of your route.
+* the `callback.uri` property is the Camel endpoint URI where Decanter Camel Processor is waiting from your processed events. Basically, it's where your route should send processed events (`to` of your route).
diff --git a/processor/camel/pom.xml b/processor/camel/pom.xml
new file mode 100644
index 0000000..97bec73
--- /dev/null
+++ b/processor/camel/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <!--
+
+        Licensed to the Apache Software Foundation (ASF) under one or more
+        contributor license agreements.  See the NOTICE file distributed with
+        this work for additional information regarding copyright ownership.
+        The ASF licenses this file to You under the Apache License, Version 2.0
+        (the "License"); you may not use this file except in compliance with
+        the License.  You may obtain a copy of the License at
+
+           http://www.apache.org/licenses/LICENSE-2.0
+
+        Unless required by applicable law or agreed to in writing, software
+        distributed under the License is distributed on an "AS IS" BASIS,
+        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        See the License for the specific language governing permissions and
+        limitations under the License.
+    -->
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.karaf.decanter</groupId>
+        <artifactId>processor</artifactId>
+        <version>2.6.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.karaf.decanter.processor</groupId>
+    <artifactId>org.apache.karaf.decanter.processor.camel</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache Karaf :: Decanter :: Processor :: Camel</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core-osgi</artifactId>
+            <version>${camel.version}</version>
+        </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>src/main/cfg/org.apache.karaf.decanter.processor.camel.cfg</file>
+                                    <type>cfg</type>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>
+                            org.apache.camel*;version="[2,4)",
+                            *
+                        </Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.processor.camel
+                        </Private-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/processor/camel/src/main/cfg/org.apache.karaf.decanter.processor.camel.cfg b/processor/camel/src/main/cfg/org.apache.karaf.decanter.processor.camel.cfg
new file mode 100644
index 0000000..9f65a81
--- /dev/null
+++ b/processor/camel/src/main/cfg/org.apache.karaf.decanter.processor.camel.cfg
@@ -0,0 +1,41 @@
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+
+#
+# Decanter Camel processor
+#
+
+#
+# Destination dispatcher topics where to send the aggregated events
+#
+#target.topics=decanter/process/camel
+
+#
+# This is the Camel endpoint URI where Decanter is sending the events
+# (using event Map<String, Object> as body)
+#
+#delegate.uri=direct-vm:decanter-delegate
+
+#
+# This is the Camel endpoint URI where user Camel route should call to be "back" in Decanter
+# The user Camel route is supposed to do "to uri=[CALLBACK]" with a Map<String, Object> body
+# resulting of the route processing.
+# Decanter uses this body to send a new Event to the dispatcher target topics.
+#
+#callback.uri=direct-vm:decanter-callback
\ No newline at end of file
diff --git a/processor/camel/src/main/java/org/apache/karaf/decanter/processor/camel/CamelProcessor.java b/processor/camel/src/main/java/org/apache/karaf/decanter/processor/camel/CamelProcessor.java
new file mode 100644
index 0000000..1f4b224
--- /dev/null
+++ b/processor/camel/src/main/java/org/apache/karaf/decanter/processor/camel/CamelProcessor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.camel;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+
+@Component(
+        name = "org.apache.karaf.decanter.processor.camel",
+        immediate = true,
+        property = EventConstants.EVENT_TOPIC + "=decanter/collect/*"
+)
+public class CamelProcessor implements EventHandler {
+
+    @Reference
+    private EventAdmin dispatcher;
+
+    private String targetTopics;
+    private String callbackUri;
+    private String delegateUri;
+
+    private ModelCamelContext camelContext;
+    private ServiceRegistration<CamelContext> serviceRegistration;
+
+    @Activate
+    public void activate(ComponentContext componentContext) throws Exception {
+        activate(componentContext.getProperties(), componentContext.getBundleContext());
+    }
+
+    public void activate(Dictionary<String, Object> configuration, BundleContext bundleContext) throws Exception {
+        targetTopics = (configuration.get("target.topics") != null) ? configuration.get("target.topics").toString() : "decanter/process/camel";
+        callbackUri = (configuration.get("callback.uri") != null) ? configuration.get("callback.uri").toString() : "direct-vm:decanter-callback";
+        delegateUri = (configuration.get("delegate.uri") != null) ? configuration.get("delegate.uri").toString() : "direct-vm:decanter-delegate";
+
+        if (bundleContext != null) {
+            camelContext = new OsgiDefaultCamelContext(bundleContext);
+            serviceRegistration = bundleContext.registerService(CamelContext.class, camelContext, null);
+        } else {
+            camelContext = new DefaultCamelContext();
+        }
+        camelContext.start();
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(callbackUri)
+                        .id("decanter-processor-callback")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                Map<String, Object> body = exchange.getIn().getBody(Map.class);
+                                body.put("processor", "camel");
+                                String[] topics = targetTopics.split(",");
+                                for (String topic : topics) {
+                                    dispatcher.postEvent(new Event(topic, body));
+                                }
+                            }
+                        }).end();
+            }
+        });
+    }
+
+    @Deactivate
+    public void deactivate() {
+        try {
+            camelContext.stop();
+            camelContext.removeRouteDefinitions(new ArrayList<RouteDefinition>(camelContext.getRouteDefinitions()));
+            if (serviceRegistration != null) {
+                serviceRegistration.unregister();
+            }
+        } catch (Exception e) {
+            // no-op
+        }
+    }
+
+    @Override
+    public void handleEvent(Event event) {
+        HashMap<String, Object> data = new HashMap<>();
+        for (String name : event.getPropertyNames()) {
+            data.put(name, event.getProperty(name));
+        }
+        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
+        producerTemplate.sendBody(delegateUri, data);
+    }
+
+    public void setDispatcher(EventAdmin dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+}
diff --git a/processor/camel/src/test/java/org/apache/karaf/decanter/processor/camel/CamelProcessorTest.java b/processor/camel/src/test/java/org/apache/karaf/decanter/processor/camel/CamelProcessorTest.java
new file mode 100644
index 0000000..45fcbc1
--- /dev/null
+++ b/processor/camel/src/test/java/org/apache/karaf/decanter/processor/camel/CamelProcessorTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.camel;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.*;
+
+public class CamelProcessorTest {
+
+    private DefaultCamelContext camelContext;
+
+    @Before
+    public void setup() throws Exception {
+        camelContext = new DefaultCamelContext();
+        camelContext.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct-vm:decanter-delegate")
+                        .id("client-route")
+                .log("Delegate body: ${body}")
+                .process(new Processor() {
+                    @Override
+                    public void process(Exchange exchange) throws Exception {
+                        Map<String, Object> body = exchange.getIn().getBody(Map.class);
+                        body.put("processed", "yes");
+                        exchange.getIn().setBody(body, Map.class);
+                    }
+                })
+                        .log("Callback body: ${body}")
+                        .to("direct-vm:decanter-callback");
+            }
+        });
+        camelContext.start();
+    }
+
+    @After
+    public void teardown() throws Exception {
+        camelContext.stop();
+    }
+
+    @Test
+    public void test() throws Exception {
+        DispatcherMock dispatcher = new DispatcherMock();
+        CamelProcessor processor = new CamelProcessor();
+        processor.setDispatcher(dispatcher);
+        Hashtable<String, Object> configuration = new Hashtable<>();
+        configuration.put("target.topic", "decanter/process/test");
+        configuration.put("delegate.uri", "direct-vm:decanter-delegate");
+        configuration.put("callback.uri", "direct-vm:decanter-callback");
+        processor.activate(configuration, null);
+
+        Map<String, Object> data = new HashMap<>();
+        data.put("foo", "bar");
+        Event event = new Event("decanter/collector/test", data);
+        processor.handleEvent(event);
+
+        Assert.assertEquals(1, dispatcher.postedEvents.size());
+
+        Assert.assertEquals("yes", dispatcher.postedEvents.get(0).getProperty("processed"));
+    }
+
+    class DispatcherMock implements EventAdmin {
+
+        public List<Event> postedEvents = new ArrayList<>();
+        public List<Event> sentEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postedEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sentEvents.add(event);
+        }
+    }
+
+}
diff --git a/processor/pom.xml b/processor/pom.xml
index dd95953..c353e94 100644
--- a/processor/pom.xml
+++ b/processor/pom.xml
@@ -37,6 +37,7 @@
         <module>passthrough</module>
         <module>aggregate</module>
         <module>groupby</module>
+        <module>camel</module>
     </modules>
 
 </project>
\ No newline at end of file