Merge pull request #214 from jbonofre/KARAF-6864

[KARAF-6864] Create groupBy processor
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index 61d72d9..80c75bf 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -462,6 +462,16 @@
         <feature>decanter-processor-aggregate-core</feature>
     </feature>
 
+    <feature name="decanter-processor-groupby-core" version="${project.version}" description="Karaf Decanter GroupBy Processor core">
+        <feature>decanter-common</feature>
+        <bundle>mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.groupby/${project.version}</bundle>
+    </feature>
+
+    <feature name="decanter-processor-groupby" version="${project.version}" description="Karaf Decanter GroupBy Processor">
+        <configfile finalname="/etc/org.apache.karaf.decanter.processor.groupby.cfg">mvn:org.apache.karaf.decanter.processor/org.apache.karaf.decanter.processor.groupby/${project.version}/cfg</configfile>
+        <feature>decanter-processor-groupby-core</feature>
+    </feature>
+
     <feature name="decanter-alerting-core" version="${project.version}" description="Karaf Decanter Alerting core">
         <feature>decanter-common</feature>
         <bundle>mvn:org.apache.karaf.decanter.alerting/org.apache.karaf.decanter.alerting.service/${project.version}</bundle>
diff --git a/itest/src/test/java/org/apache/karaf/decanter/itests/processor/GroupByProcessorTest.java b/itest/src/test/java/org/apache/karaf/decanter/itests/processor/GroupByProcessorTest.java
new file mode 100644
index 0000000..6cdc618
--- /dev/null
+++ b/itest/src/test/java/org/apache/karaf/decanter/itests/processor/GroupByProcessorTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.itests.processor;
+
+import org.apache.karaf.itests.KarafTestSupport;
+import org.apache.karaf.jaas.boot.principal.RolePrincipal;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.Option;
+import org.ops4j.pax.exam.junit.PaxExam;
+import org.ops4j.pax.exam.karaf.options.KarafDistributionOption;
+import org.ops4j.pax.exam.spi.reactors.ExamReactorStrategy;
+import org.ops4j.pax.exam.spi.reactors.PerClass;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.util.*;
+import java.util.stream.Stream;
+
+@RunWith(PaxExam.class)
+@ExamReactorStrategy(PerClass.class)
+public class GroupByProcessorTest extends KarafTestSupport {
+
+    @Configuration
+    public Option[] config() {
+        Option[] options = new Option[]{
+                KarafDistributionOption.editConfigurationFilePut("etc/system.properties", "decanter.version", System.getProperty("decanter.version"))
+        };
+        return Stream.of(super.config(), options).flatMap(Stream::of).toArray(Option[]::new);
+    }
+
+    @Test
+    public void test() throws Exception {
+        System.out.println("Installing Decanter Processor GroupBy ...");
+
+        File file = new File(System.getProperty("karaf.etc"), "org.apache.karaf.decanter.processor.groupby.cfg");
+        try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) {
+            writer.write("groupBy=foo");
+        }
+
+        String configList = executeCommand("config:list '(service.pid=org.apache.karaf.decanter.processor.groupby)'");
+        while (!configList.contains("service.pid")) {
+            Thread.sleep(500);
+            configList = executeCommand("config:list '(service.pid=org.apache.karaf.decanter.processor.groupby)'");
+        }
+
+        System.out.println(executeCommand("feature:repo-add decanter " + System.getProperty("decanter.version"), new RolePrincipal("admin")));
+        System.out.println(executeCommand("feature:install decanter-processor-groupby", new RolePrincipal("admin")));
+
+        System.out.println("Adding test event handler ...");
+        final List<Event> received = new ArrayList<>();
+        EventHandler handler = new EventHandler() {
+            @Override
+            public void handleEvent(Event event) {
+                received.add(event);
+            }
+        };
+        Hashtable<String, String> serviceProperties = new Hashtable<>();
+        serviceProperties.put(EventConstants.EVENT_TOPIC, "decanter/process/*");
+        bundleContext.registerService(EventHandler.class, handler, serviceProperties);
+
+        System.out.println("Sending test events ...");
+        EventAdmin dispatcher = getOsgiService(EventAdmin.class);
+        Map<String, Object> data1 = new HashMap<>();
+        data1.put("foo", "bar");
+        data1.put("first", "first");
+        dispatcher.sendEvent(new Event("decanter/collect/test", data1));
+        Map<String, Object> data2 = new HashMap<>();
+        data2.put("other", "other");
+        data2.put("second", "second");
+        dispatcher.sendEvent(new Event("decanter/collect/test", data2));
+        Map<String, Object> data3 = new HashMap<>();
+        data3.put("foo", "bar");
+        data3.put("third", "third");
+        dispatcher.sendEvent(new Event("decanter/collect/test", data3));
+
+        System.out.println("Waiting events ...");
+        while (received.size() < 1) {
+            Thread.sleep(500);
+        }
+
+        System.out.println("");
+
+        for (int i = 0; i < received.size(); i++) {
+            for (String property : received.get(i).getPropertyNames()) {
+                System.out.println(property + " = " + received.get(i).getProperty(property));
+            }
+            System.out.println("========");
+        }
+
+        System.out.println("");
+
+        Assert.assertEquals(1, received.size());
+
+    }
+
+}
diff --git a/manual/src/main/asciidoc/user-guide/processors.adoc b/manual/src/main/asciidoc/user-guide/processors.adoc
index 42bf87e..7696ee0 100644
--- a/manual/src/main/asciidoc/user-guide/processors.adoc
+++ b/manual/src/main/asciidoc/user-guide/processors.adoc
@@ -52,7 +52,7 @@
 
 By default, the "merged" event is sent every minute. You can change this using the `period` configuration.
 
-You can provisiong `etc/org.apache.karaf.decanter.processor.aggregate.cfg` configuration file with:
+You can provision `etc/org.apache.karaf.decanter.processor.aggregate.cfg` configuration file with:
 
 ----
 period=120 # this is the period in seconds
@@ -76,4 +76,62 @@
 overwrite=true
 ----
 
-Then, if a property already exist in the aggregator, its value will be overwritten by the new event value received in the aggregator.
\ No newline at end of file
+Then, if a property already exist in the aggregator, its value will be overwritten by the new event value received in the aggregator.
+
+==== GroupBy
+
+This processor "groups" events containing same properties values during a period.
+
+For instance, you configure the GroupBy processor to group events using `foo` and `bar` properties. Then you receive
+the following three events:
+
+1. first event containing: `{ "foo":"foo","bar":"bar","first":"value1" }`
+2. second event containing: `{ "hello":"world","second":"value2" }`
+3. third event containing: `{ "foo":"foo","bar":"bar","third":"value3"}`
+
+The groupBy processor will create (and send) one event containing:
+
+* if you choose to "flatten" the properties, the event will contain: `{ "foo":"foo", "bar":"bar", "first":"value1","third":"value3" }`
+* if you chosse not to "flatten" the properties, the event will contain: `{ "events":[ { "foo":"foo","bar":"bar","first":"value1" }, { "foo":"foo","bar":"bar","third":"value3" } ] }`
+
+You can install this processor using the `decanter-processor-groupby` feature:
+
+----
+karaf@root()> feature:install decanter-processor-groupby
+----
+
+By default, the "merged" event is sent every minute. You can change this using the `period` configuration.
+
+The GroupBy processor is configured via `etc/org.apache.karaf.decanter.processor.groupby.cfg` configuration file:
+
+----
+#
+# Decanter GroupBy processor
+#
+
+#
+# Destination dispatcher topics where to send the aggregated events
+#
+#target.topics=decanter/process/groupby
+
+#
+# Aggregation period in seconds
+#
+#period=60
+
+#
+# List of grouping properties
+#
+#groupBy=first,second
+
+#
+# If true, grouped events properties are flatten (all properties in the event) aka Map<String,Object>
+# If false, grouped events properties are inner grouped map aka Map<int, Map<String,Object>>
+#
+#flatten=true
+----
+
+* The `target.topics` property defines the list of Decanter topics (separated by `,`) where the resulting events will be sent.
+* The `period` property defines the retention period to accumulate the incoming events
+* The `groupBy` property defines the property names (separated by `,`) as grouping term
+* The `flatten` property defines the way the resulting event will be created. If `true`, all events properties will be store directly (flat) in the resulting event. If `false`, the resulting event will contain an array of properties (from the original grouped events).
\ No newline at end of file
diff --git a/processor/groupby/pom.xml b/processor/groupby/pom.xml
new file mode 100644
index 0000000..f652aad
--- /dev/null
+++ b/processor/groupby/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <!--
+
+        Licensed to the Apache Software Foundation (ASF) under one or more
+        contributor license agreements.  See the NOTICE file distributed with
+        this work for additional information regarding copyright ownership.
+        The ASF licenses this file to You under the Apache License, Version 2.0
+        (the "License"); you may not use this file except in compliance with
+        the License.  You may obtain a copy of the License at
+
+           http://www.apache.org/licenses/LICENSE-2.0
+
+        Unless required by applicable law or agreed to in writing, software
+        distributed under the License is distributed on an "AS IS" BASIS,
+        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        See the License for the specific language governing permissions and
+        limitations under the License.
+    -->
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.karaf.decanter</groupId>
+        <artifactId>processor</artifactId>
+        <version>2.6.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <groupId>org.apache.karaf.decanter.processor</groupId>
+    <artifactId>org.apache.karaf.decanter.processor.groupby</artifactId>
+    <packaging>bundle</packaging>
+    <name>Apache Karaf :: Decanter :: Processor :: GroupBy</name>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>attach-artifact</goal>
+                        </goals>
+                        <configuration>
+                            <artifacts>
+                                <artifact>
+                                    <file>src/main/cfg/org.apache.karaf.decanter.processor.groupby.cfg</file>
+                                    <type>cfg</type>
+                                </artifact>
+                            </artifacts>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>maven-bundle-plugin</artifactId>
+                <inherited>true</inherited>
+                <extensions>true</extensions>
+                <configuration>
+                    <obrRepository>NONE</obrRepository>
+                    <instructions>
+                        <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+                        <Export-Package>!*</Export-Package>
+                        <Import-Package>*</Import-Package>
+                        <Private-Package>
+                            org.apache.karaf.decanter.processor.groupby
+                        </Private-Package>
+                    </instructions>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/processor/groupby/src/main/cfg/org.apache.karaf.decanter.processor.groupby.cfg b/processor/groupby/src/main/cfg/org.apache.karaf.decanter.processor.groupby.cfg
new file mode 100644
index 0000000..26213d5
--- /dev/null
+++ b/processor/groupby/src/main/cfg/org.apache.karaf.decanter.processor.groupby.cfg
@@ -0,0 +1,43 @@
+################################################################################
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+################################################################################
+
+#
+# Decanter GroupBy processor
+#
+
+#
+# Destination dispatcher topics where to send the aggregated events
+#
+#target.topics=decanter/process/groupby
+
+#
+# Aggregation period in seconds
+#
+#period=60
+
+#
+# List of grouping properties
+#
+#groupBy=first,second
+
+#
+# If true, grouped events properties are flatten (all properties in the event) aka Map<String,Object>
+# If false, grouped events properties are inner grouped map aka Map<int, Map<String,Object>>
+#
+#flatten=true
\ No newline at end of file
diff --git a/processor/groupby/src/main/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessor.java b/processor/groupby/src/main/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessor.java
new file mode 100644
index 0000000..ad6f8b6
--- /dev/null
+++ b/processor/groupby/src/main/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.groupby;
+
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Component(
+        name = "org.apache.karaf.decanter.processor.groupby",
+        immediate = true,
+        property = EventConstants.EVENT_TOPIC + "=decanter/collect/*"
+)
+public class GroupByProcessor implements EventHandler {
+
+    @Reference
+    private EventAdmin dispatcher;
+
+    private String targetTopics;
+    private String groupBy;
+    private boolean flat;
+
+    private ConcurrentHashMap<Integer, List<Event>> accumulation = new ConcurrentHashMap<>();
+
+    private ScheduledExecutorService scheduledExecutorService;
+
+    @Activate
+    public void activate(ComponentContext componentContext) {
+        activate(componentContext.getProperties());
+    }
+
+    public void activate(Dictionary<String, Object> configuration) {
+        targetTopics = (configuration.get("target.topics") != null) ? configuration.get("target.topics").toString() : "decanter/process/aggregate";
+        long period = (configuration.get("period") != null) ? Long.parseLong(configuration.get("period").toString()) : 60L;
+        groupBy = (configuration.get("groupBy") != null) ? configuration.get("groupBy").toString() : null;
+        flat = (configuration.get("flat") != null) ? Boolean.parseBoolean(configuration.get("flat").toString()) : true;
+        scheduledExecutorService = Executors.newScheduledThreadPool(1);
+        scheduledExecutorService.scheduleAtFixedRate(new GroupByTask(), 0, period, TimeUnit.SECONDS);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        scheduledExecutorService.shutdownNow();
+    }
+
+    @Override
+    public void handleEvent(Event event) {
+        String[] groups = groupBy.split(",");
+        int hash = 0;
+        for (String group : groups) {
+            if (event.getProperty(group) == null) {
+                return;
+            } else {
+                hash = hash + event.getProperty(group).hashCode();
+            }
+        }
+        if (accumulation.get(hash) == null) {
+            List<Event> events = new ArrayList<>();
+            events.add(event);
+            accumulation.put(hash, events);
+        } else {
+            accumulation.get(hash).add(event);
+        }
+    }
+
+    class GroupByTask implements Runnable {
+
+        @Override
+        public void run() {
+            if (accumulation.size() > 0) {
+                for (Integer hash : accumulation.keySet()) {
+                    Map merge = new HashMap();
+                    merge.put("processor", "groupBy");
+                    if (flat) {
+                        for (Event event : accumulation.get(hash)) {
+                            for (String propertyName : event.getPropertyNames()) {
+                                merge.put(propertyName, event.getProperty(propertyName));
+                            }
+                        }
+                    } else {
+                        List<Map<String, Object>> events = new ArrayList<>();
+                        merge.put("events", events);
+                        for (Event event : accumulation.get(hash)) {
+                            Map<String,Object> properties = new HashMap<>();
+                            for (String propertyName : event.getPropertyNames()) {
+                                properties.put(propertyName, event.getProperty(propertyName));
+                            }
+                            events.add(properties);
+                        }
+                    }
+                    // send event
+                    String[] topics = targetTopics.split(",");
+                    for (String topic : topics) {
+                        dispatcher.postEvent(new Event(topic, merge));
+                    }
+                }
+                accumulation.clear();
+            }
+        }
+
+    }
+
+    public void setDispatcher(EventAdmin dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+}
diff --git a/processor/groupby/src/test/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessorTest.java b/processor/groupby/src/test/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessorTest.java
new file mode 100644
index 0000000..77e5047
--- /dev/null
+++ b/processor/groupby/src/test/java/org/apache/karaf/decanter/processor/groupby/GroupByProcessorTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.processor.groupby;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+import java.util.*;
+
+public class GroupByProcessorTest {
+
+    @Test(timeout = 10000)
+    public void testFlatten() throws Exception {
+        MockDispatcher dispatcher = new MockDispatcher();
+
+        GroupByProcessor processor = new GroupByProcessor();
+        processor.setDispatcher(dispatcher);
+        Hashtable<String, Object> configuration = new Hashtable<>();
+        configuration.put("period", "2");
+        configuration.put("groupBy", "foo,bar");
+        configuration.put("flat", true);
+        processor.activate(configuration);
+
+        Map<String, Object> data1 = new HashMap<>();
+        data1.put("foo", "foo");
+        data1.put("bar", "bar");
+        data1.put("first", "first");
+        Event event1 = new Event("decanter/collect/first", data1);
+        processor.handleEvent(event1);
+
+        Map<String, Object> data2 = new HashMap<>();
+        data2.put("foo", "foo");
+        data2.put("bar", "bar");
+        data2.put("second", "second");
+        Event event2 = new Event("decanter/collect/second", data2);
+        processor.handleEvent(event2);
+
+        Map<String, Object> data3 = new HashMap<>();
+        data3.put("third", "third");
+        Event event3 = new Event("decanter/collect/third", data3);
+        processor.handleEvent(event3);
+
+        Map<String, Object> data4 = new HashMap<>();
+        data4.put("foo", "foo");
+        data4.put("bar", "bar");
+        data4.put("fourth", "fourth");
+        Event event4 = new Event("decanter/collect/fourth", data4);
+        processor.handleEvent(event4);
+
+        Map<String, Object> data5 = new HashMap<>();
+        data5.put("foo", "other");
+        data5.put("bar", "other");
+        data5.put("fifth", "fifth");
+        Event event5 = new Event("decanter/collect/fifth", data5);
+        processor.handleEvent(event5);
+
+        while (dispatcher.postedEvents.size() != 2) {
+            Thread.sleep(200);
+        }
+
+        Assert.assertEquals(2, dispatcher.postedEvents.size());
+
+        Assert.assertEquals("fifth", dispatcher.postedEvents.get(0).getProperty("fifth"));
+        Assert.assertEquals("other", dispatcher.postedEvents.get(0).getProperty("bar"));
+        Assert.assertEquals("other", dispatcher.postedEvents.get(0).getProperty("foo"));
+
+        Assert.assertEquals("bar", dispatcher.postedEvents.get(1).getProperty("bar"));
+        Assert.assertEquals("fourth", dispatcher.postedEvents.get(1).getProperty("fourth"));
+        Assert.assertEquals("first", dispatcher.postedEvents.get(1).getProperty("first"));
+        Assert.assertEquals("foo", dispatcher.postedEvents.get(1).getProperty("foo"));
+        Assert.assertEquals("second", dispatcher.postedEvents.get(1).getProperty("second"));
+
+        processor.deactivate();
+    }
+
+    @Test(timeout = 10000)
+    @Ignore("Avoid thread conflict with the other test")
+    public void testNotFlatten() throws Exception {
+        MockDispatcher dispatcher = new MockDispatcher();
+
+        GroupByProcessor processor = new GroupByProcessor();
+        processor.setDispatcher(dispatcher);
+        Hashtable<String, Object> configuration = new Hashtable<>();
+        configuration.put("period", "2");
+        configuration.put("groupBy", "foo,bar");
+        configuration.put("flat", false);
+        processor.activate(configuration);
+
+        Map<String, Object> data1 = new HashMap<>();
+        data1.put("foo", "foo");
+        data1.put("bar", "bar");
+        data1.put("first", "first");
+        Event event1 = new Event("decanter/collect/first", data1);
+        processor.handleEvent(event1);
+
+        Map<String, Object> data2 = new HashMap<>();
+        data2.put("foo", "foo");
+        data2.put("bar", "bar");
+        data2.put("second", "second");
+        Event event2 = new Event("decanter/collect/second", data2);
+        processor.handleEvent(event2);
+
+        Map<String, Object> data3 = new HashMap<>();
+        data3.put("third", "third");
+        Event event3 = new Event("decanter/collect/third", data3);
+        processor.handleEvent(event3);
+
+        Map<String, Object> data4 = new HashMap<>();
+        data4.put("foo", "foo");
+        data4.put("bar", "bar");
+        data4.put("fourth", "fourth");
+        Event event4 = new Event("decanter/collect/fourth", data4);
+        processor.handleEvent(event4);
+
+        Map<String, Object> data5 = new HashMap<>();
+        data5.put("foo", "other");
+        data5.put("bar", "other");
+        data5.put("fifth", "fifth");
+        Event event5 = new Event("decanter/collect/fifth", data5);
+        processor.handleEvent(event5);
+
+        while (dispatcher.postedEvents.size() != 2) {
+            Thread.sleep(200);
+        }
+
+        Assert.assertEquals(2, dispatcher.postedEvents.size());
+
+        List<Map<String, Object>> events = (List<Map<String, Object>>) dispatcher.postedEvents.get(0).getProperty("events");
+        Assert.assertEquals(1, events.size());
+
+        events = (List<Map<String, Object>>) dispatcher.postedEvents.get(1).getProperty("events");
+        Assert.assertEquals(3, events.size());
+
+        processor.deactivate();
+    }
+
+    class MockDispatcher implements EventAdmin {
+
+        public List<Event> postedEvents = new ArrayList<>();
+        public List<Event> sentEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postedEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sentEvents.add(event);
+        }
+    }
+
+}
diff --git a/processor/pom.xml b/processor/pom.xml
index a0749ea..dd95953 100644
--- a/processor/pom.xml
+++ b/processor/pom.xml
@@ -36,6 +36,7 @@
     <modules>
         <module>passthrough</module>
         <module>aggregate</module>
+        <module>groupby</module>
     </modules>
 
 </project>
\ No newline at end of file