Merge pull request #214 from jbonofre/KARAF-6864
[KARAF-6864] Create groupBy processor
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index 61d72d9..80c75bf 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -462,6 +462,16 @@
<feature>decanter-processor-aggregate-core</feature>
</feature>
+ <feature name="decanter-processor-groupby-core" version="${project.version}" description="Karaf Decanter GroupBy Processor core">
+ <feature>decanter-common</feature>
+ <bundle>mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.groupby/${project.version}</bundle>
+ </feature>
+
+ <feature name="decanter-processor-groupby" version="${project.version}" description="Karaf Decanter GroupBy Processor">
+ <configfile finalname="/etc/org.apache.karaf.decanter.processor.groupby.cfg">mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.groupby/${project.version}/cfg</configfile>
+ <feature>decanter-processor-groupby-core</feature>
+ </feature>
+
<feature name="decanter-alerting-core" version="${project.version}" description="Karaf Decanter Alerting core">
<feature>decanter-common</feature>
<bundle>mvn:org.apache.karaf.decanter.alerting/org.apache.karaf.decanter.alerting.service/${project.version}</bundle>
diff --git a/itest/src/test/java/org/apache/karaf/decanter/itests/processor/GroupByProcessorTest.java b/itest/src/test/java/org/apache/karaf/decanter/itests/processor/GroupByProcessorTest.java
new file mode 100644
index 0000000..6cdc618
--- /dev/null
+++ b/itest/src/test/java/org/apache/karaf/decanter/itests/processor/GroupByProcessorTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.itests.processor;
+
+import org.apache.karaf.itests.KarafTestSupport;
+import org.apache.karaf.jaas.boot.principal.RolePrincipal;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.karaf.options.KarafDistributionOption;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.util.*;
+import java.util.stream.Stream;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class GroupByProcessorTest extends KarafTestSupport {
+
+ @Configuration
+ public Option[] config() {
+ Option[] options = new Option[]{
+ KarafDistributionOption.editConfigurationFilePut("etc/system.properties", "decanter.version", System.getProperty("decanter.version"))
+ };
+ return Stream.of(super.config(), options).flatMap(Stream::of).toArray(Option[]::new);
+ }
+
+ @Test
+ public void test() throws Exception {
+ System.out.println("Installing Decanter Processor GroupBy ...");
+
+ File file = new File(System.getProperty("karaf.etc"), "org.apache.karaf.decanter.processor.groupby.cfg");
+ try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+ writer.write("groupBy=foo");
+ }
+
+ String configList = executeCommand("config:list '(service.pid=org.apache.karaf.decanter.processor.groupby)'");
+ while (!configList.contains("service.pid")) {
+ Thread.sleep(500);
+ configList = executeCommand("config:list '(service.pid=org.apache.karaf.decanter.processor.groupby)'");
+ }
+
+ System.out.println(executeCommand("feature:repo-add decanter " + System.getProperty("decanter.version"), new RolePrincipal("admin")));
+ System.out.println(executeCommand("feature:install decanter-processor-groupby", new RolePrincipal("admin")));
+
+ System.out.println("Adding test event handler ...");
+ final List<Event> received = new ArrayList<>();
+ EventHandler handler = new EventHandler() {
+ @Override
+ public void handleEvent(Event event) {
+ received.add(event);
+ }
+ };
+ Hashtable<String, String> serviceProperties = new Hashtable<>();
+ serviceProperties.put(EventConstants.EVENT_TOPIC, "decanter/process/*");
+ bundleContext.registerService(EventHandler.class, handler, serviceProperties);
+
+ System.out.println("Sending test events ...");
+ EventAdmin dispatcher = getOsgiService(EventAdmin.class);
+ Map<String, Object> data1 = new HashMap<>();
+ data1.put("foo", "bar");
+ data1.put("first", "first");
+ dispatcher.sendEvent(new Event("decanter/collect/test", data1));
+ Map<String, Object> data2 = new HashMap<>();
+ data2.put("other", "other");
+ data2.put("second", "second");
+ dispatcher.sendEvent(new Event("decanter/collect/test", data2));
+ Map<String, Object> data3 = new HashMap<>();
+ data3.put("foo", "bar");
+ data3.put("third", "third");
+ dispatcher.sendEvent(new Event("decanter/collect/test", data3));
+
+ System.out.println("Waiting events ...");
+ while (received.size() < 1) {
+ Thread.sleep(500);
+ }
+
+ System.out.println("");
+
+ for (int i = 0; i < received.size(); i++) {
+ for (String property : received.get(i).getPropertyNames()) {
+ System.out.println(property + " = " + received.get(i).getProperty(property));
+ }
+ System.out.println("========");
+ }
+
+ System.out.println("");
+
+ Assert.assertEquals(1, received.size());
+
+ }
+
+}
diff --git a/manual/src/main/asciidoc/user-guide/processors.adoc b/manual/src/main/asciidoc/user-guide/processors.adoc
index 42bf87e..7696ee0 100644
--- a/manual/src/main/asciidoc/user-guide/processors.adoc
+++ b/manual/src/main/asciidoc/user-guide/processors.adoc
@@ -52,7 +52,7 @@
By default, the "merged" event is sent every minute. You can change this using the `period` configuration.
-You can provisiong `etc/org.apache.karaf.decanter.processor.aggregate.cfg` configuration file with:
+You can provision `etc/org.apache.karaf.decanter.processor.aggregate.cfg` configuration file with:
----
period=120 # this is the period in seconds
@@ -76,4 +76,62 @@
overwrite=true
----
-Then, if a property already exist in the aggregator, its value will be overwritten by the new event value received in the aggregator.
\ No newline at end of file
+Then, if a property already exist in the aggregator, its value will be overwritten by the new event value received in the aggregator.
+
+==== GroupBy
+
+This processor "groups" events containing same properties values during a period.
+
+For instance, you configure the GroupBy processor to group events using `foo` and `bar` properties. Then you receive
+the following three events:
+
+1. first event containing: `{ "foo":"foo","bar":"bar","first":"value1" }`
+2. second event containing: `{ "hello":"world","second":"value2" }`
+3. third event containing: `{ "foo":"foo","bar":"bar","third":"value3"}`
+
+The groupBy processor will create (and send) one event containing:
+
+* if you choose to "flatten" the properties, the event will contain: `{ "foo":"foo", "bar":"bar", "first":"value1","third":"value3" }`
+* if you chosse not to "flatten" the properties, the event will contain: `{ "events":[ { "foo":"foo","bar":"bar","first":"value1" }, { "foo":"foo","bar":"bar","third":"value3" } ] }`
+
+You can install this processor using the `decanter-processor-groupby` feature:
+
+----
+karaf@root()> feature:install decanter-processor-groupby
+----
+
+By default, the "merged" event is sent every minute. You can change this using the `period` configuration.
+
+The GroupBy processor is configured via `etc/org.apache.karaf.decanter.processor.groupby.cfg` configuration file:
+
+----
+#
+# Decanter GroupBy processor
+#
+
+#
+# Destination dispatcher topics where to send the aggregated events
+#
+#target.topics=decanter/process/groupby
+
+#
+# Aggregation period in seconds
+#
+#period=60
+
+#
+# List of grouping properties
+#
+#groupBy=first,second
+
+#
+# If true, grouped events properties are flatten (all properties in the event) aka Map<String,Object>
+# If false, grouped events properties are inner grouped map aka Map<int, Map<String,Object>>
+#
+#flatten=true
+----
+
+* The `target.topics` property defines the list of Decanter topics (separated by `,`) where the resulting events will be sent.
+* The `period` property defines the retention period to accumulate the incoming events
+* The `groupBy` property defines the property names (separated by `,`) as grouping term
+* The `flatten` property defines the way the resulting event will be created. If `true`, all events properties will be store directly (flat) in the resulting event. If `false`, the resulting event will contain an array of properties (from the original grouped events).
\ No newline at end of file
diff --git a/processor/groupby/pom.xml b/processor/groupby/pom.xml
new file mode 100644
index 0000000..f652aad
--- /dev/null
+++ b/processor/groupby/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.karaf.decanter</groupId>
+ <artifactId>processor</artifactId>
+ <version>2.6.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.karaf.decanter.processor</groupId>
+ <artifactId>org.apache.karaf.decanter.processor.groupby</artifactId>
+ <packaging>bundle</packaging>
+ <name>Apache Karaf :: Decanter :: Processor :: GroupBy</name>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+ <file>src/main/cfg/org.apache.karaf.decanter.processor.groupby.cfg</file>
+ <type>cfg</type>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <inherited>true</inherited>
+ <extensions>true</extensions>
+ <configuration>
+ <obrRepository>NONE</obrRepository>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Export-Package>!*</Export-Package>
+ <Import-Package>*</Import-Package>
+ <Private-Package>
+ org.apache.karaf.decanter.processor.groupby
+ </Private-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/processor/groupby/src/main/cfg/org.apache.karaf.decanter.processor.groupby.cfg b/processor/groupby/src/main/cfg/org.apache.karaf.decanter.processor.groupby.cfg
new file mode 100644
index 0000000..26213d5
--- /dev/null
+++ b/processor/groupby/src/main/cfg/org.apache.karaf.decanter.processor.groupby.cfg
@@ -0,0 +1,43 @@
+################################################################################
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+################################################################################
+
+#
+# Decanter GroupBy processor
+#
+
+#
+# Destination dispatcher topics where to send the aggregated events
+#
+#target.topics=decanter/process/groupby
+
+#
+# Aggregation period in seconds
+#
+#period=60
+
+#
+# List of grouping properties
+#
+#groupBy=first,second
+
+#
+# If true, grouped events properties are flatten (all properties in the event) aka Map<String,Object>
+# If false, grouped events properties are inner grouped map aka Map<int, Map<String,Object>>
+#
+#flatten=true
\ No newline at end of file
diff --git a/processor/groupby/src/main/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessor.java b/processor/groupby/src/main/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessor.java
new file mode 100644
index 0000000..ad6f8b6
--- /dev/null
+++ b/processor/groupby/src/main/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.groupby;
+
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Component(
+ name = "org.apache.karaf.decanter.processor.groupby",
+ immediate = true,
+ property = EventConstants.EVENT_TOPIC + "=decanter/collect/*"
+)
+public class GroupByProcessor implements EventHandler {
+
+ @Reference
+ private EventAdmin dispatcher;
+
+ private String targetTopics;
+ private String groupBy;
+ private boolean flat;
+
+ private ConcurrentHashMap<Integer, List<Event>> accumulation = new ConcurrentHashMap<>();
+
+ private ScheduledExecutorService scheduledExecutorService;
+
+ @Activate
+ public void activate(ComponentContext componentContext) {
+ activate(componentContext.getProperties());
+ }
+
+ public void activate(Dictionary<String, Object> configuration) {
+ targetTopics = (configuration.get("target.topics") != null) ? configuration.get("target.topics").toString() : "decanter/process/aggregate";
+ long period = (configuration.get("period") != null) ? Long.parseLong(configuration.get("period").toString()) : 60L;
+ groupBy = (configuration.get("groupBy") != null) ? configuration.get("groupBy").toString() : null;
+ flat = (configuration.get("flat") != null) ? Boolean.parseBoolean(configuration.get("flat").toString()) : true;
+ scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ scheduledExecutorService.scheduleAtFixedRate(new GroupByTask(), 0, period, TimeUnit.SECONDS);
+ }
+
+ @Deactivate
+ public void deactivate() {
+ scheduledExecutorService.shutdownNow();
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ String[] groups = groupBy.split(",");
+ int hash = 0;
+ for (String group : groups) {
+ if (event.getProperty(group) == null) {
+ return;
+ } else {
+ hash = hash + event.getProperty(group).hashCode();
+ }
+ }
+ if (accumulation.get(hash) == null) {
+ List<Event> events = new ArrayList<>();
+ events.add(event);
+ accumulation.put(hash, events);
+ } else {
+ accumulation.get(hash).add(event);
+ }
+ }
+
+ class GroupByTask implements Runnable {
+
+ @Override
+ public void run() {
+ if (accumulation.size() > 0) {
+ for (Integer hash : accumulation.keySet()) {
+ Map merge = new HashMap();
+ merge.put("processor", "groupBy");
+ if (flat) {
+ for (Event event : accumulation.get(hash)) {
+ for (String propertyName : event.getPropertyNames()) {
+ merge.put(propertyName, event.getProperty(propertyName));
+ }
+ }
+ } else {
+ List<Map<String, Object>> events = new ArrayList<>();
+ merge.put("events", events);
+ for (Event event : accumulation.get(hash)) {
+ Map<String,Object> properties = new HashMap<>();
+ for (String propertyName : event.getPropertyNames()) {
+ properties.put(propertyName, event.getProperty(propertyName));
+ }
+ events.add(properties);
+ }
+ }
+ // send event
+ String[] topics = targetTopics.split(",");
+ for (String topic : topics) {
+ dispatcher.postEvent(new Event(topic, merge));
+ }
+ }
+ accumulation.clear();
+ }
+ }
+
+ }
+
+ public void setDispatcher(EventAdmin dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+}
diff --git a/processor/groupby/src/test/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessorTest.java b/processor/groupby/src/test/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessorTest.java
new file mode 100644
index 0000000..77e5047
--- /dev/null
+++ b/processor/groupby/src/test/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.groupby;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.*;
+
+public class GroupByProcessorTest {
+
+ @Test(timeout = 10000)
+ public void testFlatten() throws Exception {
+ MockDispatcher dispatcher = new MockDispatcher();
+
+ GroupByProcessor processor = new GroupByProcessor();
+ processor.setDispatcher(dispatcher);
+ Hashtable<String, Object> configuration = new Hashtable<>();
+ configuration.put("period", "2");
+ configuration.put("groupBy", "foo,bar");
+ configuration.put("flat", true);
+ processor.activate(configuration);
+
+ Map<String, Object> data1 = new HashMap<>();
+ data1.put("foo", "foo");
+ data1.put("bar", "bar");
+ data1.put("first", "first");
+ Event event1 = new Event("decanter/collect/first", data1);
+ processor.handleEvent(event1);
+
+ Map<String, Object> data2 = new HashMap<>();
+ data2.put("foo", "foo");
+ data2.put("bar", "bar");
+ data2.put("second", "second");
+ Event event2 = new Event("decanter/collect/second", data2);
+ processor.handleEvent(event2);
+
+ Map<String, Object> data3 = new HashMap<>();
+ data3.put("third", "third");
+ Event event3 = new Event("decanter/collect/third", data3);
+ processor.handleEvent(event3);
+
+ Map<String, Object> data4 = new HashMap<>();
+ data4.put("foo", "foo");
+ data4.put("bar", "bar");
+ data4.put("fourth", "fourth");
+ Event event4 = new Event("decanter/collect/fourth", data4);
+ processor.handleEvent(event4);
+
+ Map<String, Object> data5 = new HashMap<>();
+ data5.put("foo", "other");
+ data5.put("bar", "other");
+ data5.put("fifth", "fifth");
+ Event event5 = new Event("decanter/collect/fifth", data5);
+ processor.handleEvent(event5);
+
+ while (dispatcher.postedEvents.size() != 2) {
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(2, dispatcher.postedEvents.size());
+
+ Assert.assertEquals("fifth", dispatcher.postedEvents.get(0).getProperty("fifth"));
+ Assert.assertEquals("other", dispatcher.postedEvents.get(0).getProperty("bar"));
+ Assert.assertEquals("other", dispatcher.postedEvents.get(0).getProperty("foo"));
+
+ Assert.assertEquals("bar", dispatcher.postedEvents.get(1).getProperty("bar"));
+ Assert.assertEquals("fourth", dispatcher.postedEvents.get(1).getProperty("fourth"));
+ Assert.assertEquals("first", dispatcher.postedEvents.get(1).getProperty("first"));
+ Assert.assertEquals("foo", dispatcher.postedEvents.get(1).getProperty("foo"));
+ Assert.assertEquals("second", dispatcher.postedEvents.get(1).getProperty("second"));
+
+ processor.deactivate();
+ }
+
+ @Test(timeout = 10000)
+ @Ignore("Avoid thread conflict with the other test")
+ public void testNotFlatten() throws Exception {
+ MockDispatcher dispatcher = new MockDispatcher();
+
+ GroupByProcessor processor = new GroupByProcessor();
+ processor.setDispatcher(dispatcher);
+ Hashtable<String, Object> configuration = new Hashtable<>();
+ configuration.put("period", "2");
+ configuration.put("groupBy", "foo,bar");
+ configuration.put("flat", false);
+ processor.activate(configuration);
+
+ Map<String, Object> data1 = new HashMap<>();
+ data1.put("foo", "foo");
+ data1.put("bar", "bar");
+ data1.put("first", "first");
+ Event event1 = new Event("decanter/collect/first", data1);
+ processor.handleEvent(event1);
+
+ Map<String, Object> data2 = new HashMap<>();
+ data2.put("foo", "foo");
+ data2.put("bar", "bar");
+ data2.put("second", "second");
+ Event event2 = new Event("decanter/collect/second", data2);
+ processor.handleEvent(event2);
+
+ Map<String, Object> data3 = new HashMap<>();
+ data3.put("third", "third");
+ Event event3 = new Event("decanter/collect/third", data3);
+ processor.handleEvent(event3);
+
+ Map<String, Object> data4 = new HashMap<>();
+ data4.put("foo", "foo");
+ data4.put("bar", "bar");
+ data4.put("fourth", "fourth");
+ Event event4 = new Event("decanter/collect/fourth", data4);
+ processor.handleEvent(event4);
+
+ Map<String, Object> data5 = new HashMap<>();
+ data5.put("foo", "other");
+ data5.put("bar", "other");
+ data5.put("fifth", "fifth");
+ Event event5 = new Event("decanter/collect/fifth", data5);
+ processor.handleEvent(event5);
+
+ while (dispatcher.postedEvents.size() != 2) {
+ Thread.sleep(200);
+ }
+
+ Assert.assertEquals(2, dispatcher.postedEvents.size());
+
+ List<Map<String, Object>> events = (List<Map<String, Object>>) dispatcher.postedEvents.get(0).getProperty("events");
+ Assert.assertEquals(1, events.size());
+
+ events = (List<Map<String, Object>>) dispatcher.postedEvents.get(1).getProperty("events");
+ Assert.assertEquals(3, events.size());
+
+ processor.deactivate();
+ }
+
+ class MockDispatcher implements EventAdmin {
+
+ public List<Event> postedEvents = new ArrayList<>();
+ public List<Event> sentEvents = new ArrayList<>();
+
+ @Override
+ public void postEvent(Event event) {
+ postedEvents.add(event);
+ }
+
+ @Override
+ public void sendEvent(Event event) {
+ sentEvents.add(event);
+ }
+ }
+
+}
diff --git a/processor/pom.xml b/processor/pom.xml
index a0749ea..dd95953 100644
--- a/processor/pom.xml
+++ b/processor/pom.xml
@@ -36,6 +36,7 @@
<modules>
<module>passthrough</module>
<module>aggregate</module>
+ <module>groupby</module>
</modules>
</project>
\ No newline at end of file