Merge pull request #216 from jbonofre/KARAF-6736
[KARAF-6736] Add Camel processor
diff --git a/assembly/pom.xml b/assembly/pom.xml
index d51e8bd..96c53ce 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -91,6 +91,7 @@
<descriptor>mvn:org.apache.karaf.features/framework/${karaf.version}/xml/features</descriptor>
<descriptor>mvn:org.apache.karaf.features/standard/${karaf.version}/xml/features</descriptor>
<descriptor>mvn:org.apache.karaf.features/enterprise/${karaf.version}/xml/features</descriptor>
+ <descriptor>mvn:org.apache.camel.karaf/apache-camel/${camel.version}/xml/features</descriptor>
</descriptors>
<distribution>org.apache.karaf.features:framework</distribution>
<javase>1.8</javase>
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index 85002d9..f8fe563 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -18,8 +18,6 @@
-->
<features name="karaf-decanter-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.4.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.4.0 http://karaf.apache.org/xmlns/features/v1.4.0">
- <repository>mvn:org.apache.camel.karaf/apache-camel/${camel.version}/xml/features</repository>
-
<feature name="decanter-common" version="${project.version}" description="Karaf Decanter API">
<config name="org.apache.felix.eventadmin.impl.EventAdmin" append="true">
org.apache.felix.eventadmin.ThreadPoolSize=40
@@ -299,6 +297,7 @@
<feature>decanter-common</feature>
<feature>camel-core</feature>
<bundle>mvn:org.apache.karaf.decanter.appender/org.apache.karaf.decanter.appender.camel/${project.version}</bundle>
+ <bundle>mvn:org.apache.karaf.decanter.appender/org.apache.karaf.decanter.appender.camel/${project.version}</bundle>
</feature>
<feature name="decanter-appender-camel" version="${project.version}" description="Karaf Decanter Camel Appender">
@@ -482,6 +481,18 @@
<feature>decanter-processor-groupby-core</feature>
</feature>
+ <feature name="decanter-processor-camel-core" version="${project.version}" description="Karaf Decanter Camel Processor core">
+ <feature>decanter-common</feature>
+ <feature>camel-core</feature>
+ <bundle dependency="true">mvn:org.apache.camel/camel-core-osgi/${camel.version}</bundle>
+ <bundle>mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.camel/${project.version}</bundle>
+ </feature>
+
+ <feature name="decanter-processor-camel" version="${project.version}" description="Karaf Decanter Camel Processor">
+ <configfile finalname="/etc/org.apache.karaf.decanter.processor.camel.cfg">mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.camel/${project.version}/cfg</configfile>
+ <feature>decanter-processor-camel-core</feature>
+ </feature>
+
<feature name="decanter-alerting-core" version="${project.version}" description="Karaf Decanter Alerting core">
<feature>decanter-common</feature>
<bundle>mvn:org.apache.karaf.decanter.alerting/org.apache.karaf.decanter.alerting.service/${project.version}</bundle>
diff --git a/itest/src/test/java/org/apache/karaf/decanter/itests/processor/CamelProcessorTest.java b/itest/src/test/java/org/apache/karaf/decanter/itests/processor/CamelProcessorTest.java
new file mode 100644
index 0000000..62bbc2d
--- /dev/null
+++ b/itest/src/test/java/org/apache/karaf/decanter/itests/processor/CamelProcessorTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.itests.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.karaf.itests.KarafTestSupport;
+import org.apache.karaf.jaas.boot.principal.RolePrincipal;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.karaf.options.KarafDistributionOption;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.util.*;
+import java.util.stream.Stream;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class CamelProcessorTest extends KarafTestSupport {
+
+ @Configuration
+ public Option[] config() {
+ Option[] options = new Option[]{
+ KarafDistributionOption.editConfigurationFilePut("etc/system.properties", "decanter.version", System.getProperty("decanter.version")),
+ KarafDistributionOption.features("mvn:org.apache.camel.karaf/apache-camel/" + System.getProperty("camel.version") + "/xml/features", "camel-core")
+ };
+ return Stream.of(super.config(), options).flatMap(Stream::of).toArray(Option[]::new);
+ }
+
+ @Test
+ public void test() throws Exception {
+ System.out.println("Installing Decanter Processor Camel ...");
+ System.out.println(executeCommand("feature:repo-add decanter " + System.getProperty("decanter.version"), new RolePrincipal("admin")));
+ System.out.println(executeCommand("feature:install decanter-processor-camel", new RolePrincipal("admin")));
+ String configList = executeCommand("config:list '(service.pid=org.apache.karaf.decanter.processor.camel)'");
+ while (!configList.contains("service.pid")) {
+ Thread.sleep(500);
+ configList = executeCommand("config:list '(service.pid=org.apache.karaf.decanter.processor.camel)'");
+ }
+
+ System.out.println("Creating test Camel route ...");
+ DefaultCamelContext camelContext = new DefaultCamelContext();
+ camelContext.setName("decanter-test-context");
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:decanter-delegate")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Map<String, Object> body = exchange.getIn().getBody(Map.class);
+ body.put("camel-processing", "of-course");
+ exchange.getIn().setBody(body, Map.class);
+ }
+ }).to("direct-vm:decanter-callback");
+ }
+ });
+ camelContext.start();
+ while (!camelContext.isStarted()) {
+ Thread.sleep(200);
+ }
+
+ System.out.println("Adding event handler ...");
+ final List<Event> received = new ArrayList<>();
+ EventHandler eventHandler = new EventHandler() {
+ @Override
+ public void handleEvent(Event event) {
+ received.add(event);
+ }
+ };
+ Hashtable<String, Object> serviceProperties = new Hashtable<>();
+ serviceProperties.put(EventConstants.EVENT_TOPIC, "decanter/process/*");
+ bundleContext.registerService(EventHandler.class, eventHandler, serviceProperties);
+
+ System.out.println("Sending test events ...");
+ EventAdmin dispatcher = getOsgiService(EventAdmin.class);
+ Map<String, Object> data = new HashMap<>();
+ data.put("foo", "bar");
+ dispatcher.sendEvent(new Event("decanter/collect/test", data));
+
+ System.out.println("Waiting events ...");
+ while (received.size() < 1) {
+ Thread.sleep(500);
+ }
+
+ System.out.println("");
+
+ for (int i = 0; i < received.size(); i++) {
+ for (String property : received.get(i).getPropertyNames()) {
+ System.out.println(property + " = " + received.get(i).getProperty(property));
+ }
+ System.out.println("========");
+ }
+
+ System.out.println("");
+
+ Assert.assertEquals(1, received.size());
+
+ Assert.assertEquals("of-course", received.get(0).getProperty("camel-processing"));
+ Assert.assertEquals("camel", received.get(0).getProperty("processor"));
+ Assert.assertEquals("bar", received.get(0).getProperty("foo"));
+ }
+
+}
diff --git a/manual/src/main/asciidoc/user-guide/processors.adoc b/manual/src/main/asciidoc/user-guide/processors.adoc
index 7696ee0..55dc125 100644
--- a/manual/src/main/asciidoc/user-guide/processors.adoc
+++ b/manual/src/main/asciidoc/user-guide/processors.adoc
@@ -107,11 +107,6 @@
----
#
# Decanter GroupBy processor
-#
-
-#
-# Destination dispatcher topics where to send the aggregated events
-#
#target.topics=decanter/process/groupby
#
@@ -134,4 +129,54 @@
* The `target.topics` property defines the list of Decanter topics (separated by `,`) where the resulting events will be sent.
* The `period` property defines the retention period to accumulate the incoming events
* The `groupBy` property defines the property names (separated by `,`) as grouping term
-* The `flatten` property defines the way the resulting event will be created. If `true`, all events properties will be store directly (flat) in the resulting event. If `false`, the resulting event will contain an array of properties (from the original grouped events).
\ No newline at end of file
+* The `flatten` property defines the way the resulting event will be created. If `true`, all events properties will be store directly (flat) in the resulting event. If `false`, the resulting event will contain an array of properties (from the original grouped events).
+
+==== Apache Camel
+
+It's also possible you implement your own event processor using Apache Camel.
+
+Decanter Camel Processor delegates event processing to your Camel route. Your route just has to callback Decanter (on a dedicated Camel endpoint)
+to send the processed event back in the dispatcher.
+
+By default, Decanter Camel processor send the events to `direct-vm:decanter-delegate` endpoint, and
+expects the processed event back on `direct-vm:decanter-callback`.
+
+The Camel message body is `Map<String,Object>` (it's what Decanter is sending into your Camel route and expects on
+the callback endpoint).
+
+You can install the Camel processor with the `decanter-processor-camel` feature:
+
+----
+karaf@root()> feature:install decanter-processor-camel
+----
+
+This feature also installs `etc/org.apache.karaf.decanter.processor.camel.cfg` configuration file:
+
+----
+#
+# Decanter Camel processor
+#
+
+#
+# Destination dispatcher topics where to send the aggregated events
+#
+#target.topics=decanter/process/camel
+
+#
+# This is the Camel endpoint URI where Decanter is sending the events
+# (using event Map<String, Object> as body)
+#
+#delegate.uri=direct-vm:decanter-delegate
+
+#
+# This is the Camel endpoint URI where user Camel route should call to be "back" in Decanter
+# The user Camel route is supposed to do "to uri=[CALLBACK]" with a Map<String, Object> body
+# resulting of the route processing.
+# Decanter uses this body to send a new Event to the dispatcher target topics.
+#
+#callback.uri=direct-vm:decanter-callback
+----
+
+* the `target.topics` property is the list of Decanter dispatcher topics (separated by `,`) where the processor will "forward" the processed events.
+* the `delegate.uri` property is the Camel endpoint URI where Decanter Camel Processor will send events (as `Map<String,Object>`). It's basically the `from` endpoint of your route.
+* the `callback.uri` property is the Camel endpoint URI where Decanter Camel Processor is waiting from your processed events. Basically, it's where your route should send processed events (`to` of your route).
diff --git a/processor/camel/pom.xml b/processor/camel/pom.xml
new file mode 100644
index 0000000..97bec73
--- /dev/null
+++ b/processor/camel/pom.xml
@@ -0,0 +1,96 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.karaf.decanter</groupId>
+ <artifactId>processor</artifactId>
+ <version>2.6.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.karaf.decanter.processor</groupId>
+ <artifactId>org.apache.karaf.decanter.processor.camel</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Karaf :: Decanter :: Processor :: Camel</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-core-osgi</artifactId>
+ <version>${camel.version}</version>
+ </dependency>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>src/main/cfg/org.apache.karaf.decanter.processor.camel.cfg</file>
+ <type>cfg</type>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>
+ org.apache.camel*;version="[2,4)",
+ *
+ </Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.processor.camel
+ </Private-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/processor/camel/src/main/cfg/org.apache.karaf.decanter.processor.camel.cfg b/processor/camel/src/main/cfg/org.apache.karaf.decanter.processor.camel.cfg
new file mode 100644
index 0000000..9f65a81
--- /dev/null
+++ b/processor/camel/src/main/cfg/org.apache.karaf.decanter.processor.camel.cfg
@@ -0,0 +1,41 @@
+################################################################################
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+################################################################################
+
+#
+# Decanter Camel processor
+#
+
+#
+# Destination dispatcher topics where to send the aggregated events
+#
+#target.topics=decanter/process/camel
+
+#
+# This is the Camel endpoint URI where Decanter is sending the events
+# (using event Map<String, Object> as body)
+#
+#delegate.uri=direct-vm:decanter-delegate
+
+#
+# This is the Camel endpoint URI where user Camel route should call to be "back" in Decanter
+# The user Camel route is supposed to do "to uri=[CALLBACK]" with a Map<String, Object> body
+# resulting of the route processing.
+# Decanter uses this body to send a new Event to the dispatcher target topics.
+#
+#callback.uri=direct-vm:decanter-callback
\ No newline at end of file
diff --git a/processor/camel/src/main/java/org/apache/karaf/decanter/processor/camel/CamelProcessor.java b/processor/camel/src/main/java/org/apache/karaf/decanter/processor/camel/CamelProcessor.java
new file mode 100644
index 0000000..1f4b224
--- /dev/null
+++ b/processor/camel/src/main/java/org/apache/karaf/decanter/processor/camel/CamelProcessor.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.camel;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.RouteDefinition;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Map;
+
+@Component(
+ name = "org.apache.karaf.decanter.processor.camel",
+ immediate = true,
+ property = EventConstants.EVENT_TOPIC + "=decanter/collect/*"
+)
+public class CamelProcessor implements EventHandler {
+
+ @Reference
+ private EventAdmin dispatcher;
+
+ private String targetTopics;
+ private String callbackUri;
+ private String delegateUri;
+
+ private ModelCamelContext camelContext;
+ private ServiceRegistration<CamelContext> serviceRegistration;
+
+ @Activate
+ public void activate(ComponentContext componentContext) throws Exception {
+ activate(componentContext.getProperties(), componentContext.getBundleContext());
+ }
+
+ public void activate(Dictionary<String, Object> configuration, BundleContext bundleContext) throws Exception {
+ targetTopics = (configuration.get("target.topics") != null) ? configuration.get("target.topics").toString() : "decanter/process/camel";
+ callbackUri = (configuration.get("callback.uri") != null) ? configuration.get("callback.uri").toString() : "direct-vm:decanter-callback";
+ delegateUri = (configuration.get("delegate.uri") != null) ? configuration.get("delegate.uri").toString() : "direct-vm:decanter-delegate";
+
+ if (bundleContext != null) {
+ camelContext = new OsgiDefaultCamelContext(bundleContext);
+ serviceRegistration = bundleContext.registerService(CamelContext.class, camelContext, null);
+ } else {
+ camelContext = new DefaultCamelContext();
+ }
+ camelContext.start();
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from(callbackUri)
+ .id("decanter-processor-callback")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Map<String, Object> body = exchange.getIn().getBody(Map.class);
+ body.put("processor", "camel");
+ String[] topics = targetTopics.split(",");
+ for (String topic : topics) {
+ dispatcher.postEvent(new Event(topic, body));
+ }
+ }
+ }).end();
+ }
+ });
+ }
+
+ @Deactivate
+ public void deactivate() {
+ try {
+ camelContext.stop();
+ camelContext.removeRouteDefinitions(new ArrayList<RouteDefinition>(camelContext.getRouteDefinitions()));
+ if (serviceRegistration != null) {
+ serviceRegistration.unregister();
+ }
+ } catch (Exception e) {
+ // no-op
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ HashMap<String, Object> data = new HashMap<>();
+ for (String name : event.getPropertyNames()) {
+ data.put(name, event.getProperty(name));
+ }
+ ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
+ producerTemplate.sendBody(delegateUri, data);
+ }
+
+ public void setDispatcher(EventAdmin dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+}
diff --git a/processor/camel/src/test/java/org/apache/karaf/decanter/processor/camel/CamelProcessorTest.java b/processor/camel/src/test/java/org/apache/karaf/decanter/processor/camel/CamelProcessorTest.java
new file mode 100644
index 0000000..45fcbc1
--- /dev/null
+++ b/processor/camel/src/test/java/org/apache/karaf/decanter/processor/camel/CamelProcessorTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.camel;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.*;
+
+public class CamelProcessorTest {
+
+ private DefaultCamelContext camelContext;
+
+ @Before
+ public void setup() throws Exception {
+ camelContext = new DefaultCamelContext();
+ camelContext.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct-vm:decanter-delegate")
+ .id("client-route")
+ .log("Delegate body: ${body}")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ Map<String, Object> body = exchange.getIn().getBody(Map.class);
+ body.put("processed", "yes");
+ exchange.getIn().setBody(body, Map.class);
+ }
+ })
+ .log("Callback body: ${body}")
+ .to("direct-vm:decanter-callback");
+ }
+ });
+ camelContext.start();
+ }
+
+ @After
+ public void teardown() throws Exception {
+ camelContext.stop();
+ }
+
+ @Test
+ public void test() throws Exception {
+ DispatcherMock dispatcher = new DispatcherMock();
+ CamelProcessor processor = new CamelProcessor();
+ processor.setDispatcher(dispatcher);
+ Hashtable<String, Object> configuration = new Hashtable<>();
+ configuration.put("target.topic", "decanter/process/test");
+ configuration.put("delegate.uri", "direct-vm:decanter-delegate");
+ configuration.put("callback.uri", "direct-vm:decanter-callback");
+ processor.activate(configuration, null);
+
+ Map<String, Object> data = new HashMap<>();
+ data.put("foo", "bar");
+ Event event = new Event("decanter/collector/test", data);
+ processor.handleEvent(event);
+
+ Assert.assertEquals(1, dispatcher.postedEvents.size());
+
+ Assert.assertEquals("yes", dispatcher.postedEvents.get(0).getProperty("processed"));
+ }
+
+ class DispatcherMock implements EventAdmin {
+
+ public List<Event> postedEvents = new ArrayList<>();
+ public List<Event> sentEvents = new ArrayList<>();
+
+ @Override
+ public void postEvent(Event event) {
+ postedEvents.add(event);
+ }
+
+ @Override
+ public void sendEvent(Event event) {
+ sentEvents.add(event);
+ }
+ }
+
+}
diff --git a/processor/pom.xml b/processor/pom.xml
index dd95953..c353e94 100644
--- a/processor/pom.xml
+++ b/processor/pom.xml
@@ -37,6 +37,7 @@
<module>passthrough</module>
<module>aggregate</module>
<module>groupby</module>
+ <module>camel</module>
</modules>
</project>
\ No newline at end of file