Use the Component DSL to run the whiteboard, contribute integration tests from the BRAIN-IoT prototype
diff --git a/org.apache.aries.typedevent.bus/pom.xml b/org.apache.aries.typedevent.bus/pom.xml
index db237cf..5c38b68 100644
--- a/org.apache.aries.typedevent.bus/pom.xml
+++ b/org.apache.aries.typedevent.bus/pom.xml
@@ -22,6 +22,18 @@
     
     <artifactId>org.apache.aries.typedevent.bus</artifactId>
     
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.apache.aries.typedevent</groupId>
+                <artifactId>typedevent-test-bom</artifactId>
+                <version>${project.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+    
     <dependencies>
         <dependency>
             <groupId>org.osgi</groupId>
@@ -67,5 +79,38 @@
             <groupId>org.mockito</groupId>
             <artifactId>mockito-junit-jupiter</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.test.junit5</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+        </dependency>
     </dependencies>
+    
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-resolver-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-testing-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-run-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/run.bndrun b/org.apache.aries.typedevent.bus/run.bndrun
new file mode 100644
index 0000000..91345b1
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/run.bndrun
@@ -0,0 +1,43 @@
+#    Licensed to the Apache Software Foundation (ASF) under one
+#    or more contributor license agreements.  See the NOTICE file
+#    distributed with this work for additional information
+#    regarding copyright ownership.  The ASF licenses this file
+#    to you under the Apache License, Version 2.0 (the
+#    "License"); you may not use this file except in compliance
+#    with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing,
+#    software distributed under the License is distributed on an
+#    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#    KIND, either express or implied.  See the License for the
+#    specific language governing permissions and limitations
+#    under the License.
+
+-tester: biz.aQute.tester.junit-platform
+
+-runfw: org.apache.felix.framework
+
+-runrequires: bnd.identity;id="org.apache.aries.typedevent.bus",\
+  bnd.identity;id="org.apache.felix.gogo.shell",\
+  bnd.identity;id="org.apache.felix.gogo.runtime",\
+  bnd.identity;id="org.apache.felix.gogo.command"
+  
+
+-resolve.effective: active
+-runbundles: \
+    ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
+    ch.qos.logback.core;version='[1.2.3,1.2.4)',\
+    org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
+    org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
+    org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
+    org.apache.felix.converter;version='[1.0.14,1.0.15)',\
+    org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
+    org.osgi.util.function;version='[1.1.0,1.1.1)',\
+    org.osgi.util.promise;version='[1.1.1,1.1.2)',\
+    org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
+    slf4j.api;version='[1.7.30,1.7.31)',\
+    org.apache.felix.gogo.command;version='[1.0.2,1.0.3)',\
+    org.apache.felix.gogo.runtime;version='[1.0.10,1.0.11)',\
+    org.apache.felix.gogo.shell;version='[1.0.0,1.0.1)'
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
index af61b3f..06bc1d3 100644
--- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
+++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
@@ -18,28 +18,36 @@
 package org.apache.aries.typedevent.bus.impl;
 
 import static java.util.function.Function.identity;
+import static org.apache.aries.component.dsl.OSGi.all;
+import static org.apache.aries.component.dsl.OSGi.coalesce;
+import static org.apache.aries.component.dsl.OSGi.configuration;
+import static org.apache.aries.component.dsl.OSGi.nothing;
+import static org.apache.aries.component.dsl.OSGi.just;
+import static org.apache.aries.component.dsl.OSGi.register;
+import static org.apache.aries.component.dsl.OSGi.service;
+import static org.apache.aries.component.dsl.OSGi.serviceReferences;
 
 import java.util.Arrays;
 import java.util.Dictionary;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.stream.Collectors;
 
+import org.apache.aries.component.dsl.OSGi;
+import org.apache.aries.component.dsl.OSGiResult;
 import org.osgi.annotation.bundle.Header;
 import org.osgi.framework.BundleActivator;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.Constants;
 import org.osgi.framework.ServiceReference;
-import org.osgi.framework.ServiceRegistration;
 import org.osgi.service.typedevent.TypedEventBus;
 import org.osgi.service.typedevent.TypedEventHandler;
 import org.osgi.service.typedevent.UnhandledEventHandler;
 import org.osgi.service.typedevent.UntypedEventHandler;
 import org.osgi.service.typedevent.monitor.TypedEventMonitor;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,15 +56,7 @@
 
     private static final Logger _log = LoggerFactory.getLogger(TypedEventBusActivator.class);
 
-    private TypedEventMonitorImpl monitorImpl;
-    private ServiceRegistration<TypedEventMonitor> monitorReg;
-
-    private TypedEventBusImpl busImpl;
-    private ServiceRegistration<TypedEventBus> busReg;
-
-    private ServiceTracker<TypedEventHandler<?>, TypedEventHandler<?>> typedTracker;
-    private ServiceTracker<UntypedEventHandler, UntypedEventHandler> untypedTracker;
-    private ServiceTracker<UnhandledEventHandler, UnhandledEventHandler> unhandledTracker;
+    OSGiResult eventBus;
 
     @Override
     public void start(BundleContext bundleContext) throws Exception {
@@ -64,120 +64,79 @@
             _log.debug("Aries Typed Event Bus Starting");
         }
 
-        // TODO use Config Admin
-
-        Map<String, Object> map = new HashMap<String, Object>();
-
-        createEventBus(bundleContext, map);
+        eventBus = coalesce(
+                    configuration("org.apache.aries.typedevent.bus"),
+                    just(Hashtable::new)
+                )
+                .map(this::toConfigProps)
+                .flatMap(configuration -> createProgram(configuration))
+                .run(bundleContext);
 
         if (_log.isDebugEnabled()) {
             _log.debug("Aries Typed Event Bus Started");
         }
     }
 
-    private void createEventBus(BundleContext bundleContext, Map<String, ?> configuration) throws Exception {
+    private OSGi<?> createProgram(Map<String, ?> configuration) {
 
-        Dictionary<String, Object> serviceProps = toServiceProps(configuration);
+        Map<String, Object> serviceProps = toServiceProps(configuration);
 
-        monitorImpl = new TypedEventMonitorImpl(configuration);
-        busImpl = new TypedEventBusImpl(monitorImpl, configuration);
+        return just(configuration)
+                .map(TypedEventMonitorImpl::new)
+                .effects(x -> { }, TypedEventMonitorImpl::destroy)
+                .flatMap(
+                        temi -> register(TypedEventMonitor.class, temi, serviceProps)
+                                .then(just(new TypedEventBusImpl(temi, configuration))
+                                .effects(TypedEventBusImpl::start, TypedEventBusImpl::stop)))
+                .flatMap(
+                        tebi -> all(
+                                serviceReferences(TypedEventHandler.class, 
+                                        csr -> {
+                                            tebi.updatedTypedEventHandler(
+                                                    getServiceProps(csr.getServiceReference()));
+                                            return false;
+                                        })
+                                        .flatMap(csr -> service(csr)
+                                                .effects(
+                                                         handler -> tebi.addTypedEventHandler(handler,
+                                                                    getServiceProps(csr.getServiceReference())),
+                                                         handler -> tebi.removeTypedEventHandler(handler,
+                                                                    getServiceProps(csr.getServiceReference())))),
+                                serviceReferences(UntypedEventHandler.class, 
+                                        csr -> {
+                                            tebi.updatedTypedEventHandler(
+                                                    getServiceProps(csr.getServiceReference()));
+                                            return false;
+                                        })
+                                        .flatMap(csr -> service(csr)
+                                                .effects(
+                                                         handler -> tebi.addUntypedEventHandler(handler,
+                                                                    getServiceProps(csr.getServiceReference())),
+                                                         handler -> tebi.removeUntypedEventHandler(handler,
+                                                                    getServiceProps(csr.getServiceReference())))),
+                                serviceReferences(UnhandledEventHandler.class)
+                                        .flatMap(csr -> service(csr)
+                                                .effects(handler -> tebi.addUnhandledEventHandler(handler,
+                                                                    getServiceProps(csr.getServiceReference())),
+                                                         handler -> tebi.removeUnhandledEventHandler(handler,
+                                                                    getServiceProps(csr.getServiceReference())))),
+                                register(TypedEventBus.class, tebi, serviceProps)
+                                        .flatMap(x -> nothing())));
 
-        untypedTracker = new ServiceTracker<>(bundleContext, UntypedEventHandler.class,
-                new ServiceTrackerCustomizer<UntypedEventHandler, UntypedEventHandler>() {
-
-                    @Override
-                    public UntypedEventHandler addingService(ServiceReference<UntypedEventHandler> reference) {
-                        UntypedEventHandler service = bundleContext.getService(reference);
-                        busImpl.addUntypedEventHandler(service, getServiceProps(reference));
-                        return service;
-                    }
-
-                    @Override
-                    public void modifiedService(ServiceReference<UntypedEventHandler> reference,
-                            UntypedEventHandler service) {
-                        busImpl.updatedUntypedEventHandler(service, getServiceProps(reference));
-                    }
-
-                    @Override
-                    public void removedService(ServiceReference<UntypedEventHandler> reference,
-                            UntypedEventHandler service) {
-                        busImpl.removeUntypedEventHandler(service, getServiceProps(reference));
-                    }
-                });
-
-        untypedTracker = new ServiceTracker<>(bundleContext, UntypedEventHandler.class,
-                new ServiceTrackerCustomizer<UntypedEventHandler, UntypedEventHandler>() {
-
-                    @Override
-                    public UntypedEventHandler addingService(ServiceReference<UntypedEventHandler> reference) {
-                        UntypedEventHandler service = bundleContext.getService(reference);
-                        busImpl.addUntypedEventHandler(service, getServiceProps(reference));
-                        return service;
-                    }
-
-                    @Override
-                    public void modifiedService(ServiceReference<UntypedEventHandler> reference,
-                            UntypedEventHandler service) {
-                        busImpl.updatedUntypedEventHandler(service, getServiceProps(reference));
-                    }
-
-                    @Override
-                    public void removedService(ServiceReference<UntypedEventHandler> reference,
-                            UntypedEventHandler service) {
-                        busImpl.removeUntypedEventHandler(service, getServiceProps(reference));
-                    }
-                });
-
-        unhandledTracker = new ServiceTracker<>(bundleContext, UnhandledEventHandler.class,
-                new ServiceTrackerCustomizer<UnhandledEventHandler, UnhandledEventHandler>() {
-
-                    @Override
-                    public UnhandledEventHandler addingService(ServiceReference<UnhandledEventHandler> reference) {
-                        UnhandledEventHandler service = bundleContext.getService(reference);
-                        busImpl.addUnhandledEventHandler(service, getServiceProps(reference));
-                        return service;
-                    }
-
-                    @Override
-                    public void modifiedService(ServiceReference<UnhandledEventHandler> reference,
-                            UnhandledEventHandler service) {
-                    }
-
-                    @Override
-                    public void removedService(ServiceReference<UnhandledEventHandler> reference,
-                            UnhandledEventHandler service) {
-                        busImpl.removeUnhandledEventHandler(service, getServiceProps(reference));
-                    }
-                });
-
-        try {
-            busImpl.start();
-
-            monitorReg = bundleContext.registerService(TypedEventMonitor.class, monitorImpl, serviceProps);
-
-            typedTracker.open();
-            untypedTracker.open();
-            unhandledTracker.open();
-
-            busReg = bundleContext.registerService(TypedEventBus.class, busImpl, serviceProps);
-
-        } catch (Exception e) {
-            stop(bundleContext);
-        }
 
     }
 
-    private void safeUnregister(ServiceRegistration<?> reg) {
-        try {
-            reg.unregister();
-        } catch (IllegalStateException ise) {
-            // no op
-            // TODO LOG this
+    private Map<String, Object> toConfigProps(Dictionary<String, ?> config) {
+        Enumeration<String> keys = config.keys();
+        Map<String, Object> map = new HashMap<>();
+         while(keys.hasMoreElements()) {
+            String key = keys.nextElement();
+            map.put(key, config.get(key));
         }
-
+        return map;
     }
-
-    private Dictionary<String, Object> toServiceProps(Map<String, ?> config) {
+    
+    private Map<String, Object> toServiceProps(Map<String, ?> config) {
         return config.entrySet().stream().filter(e -> e.getKey() != null && e.getKey().startsWith("."))
                 .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> {
                     throw new IllegalArgumentException("Duplicate key ");
@@ -194,30 +153,7 @@
             _log.debug("Aries Typed Event Bus Stopping");
         }
 
-        // Order matters here
-        if (busReg != null) {
-            safeUnregister(busReg);
-        }
-
-        if (busImpl != null) {
-            busImpl.stop();
-        }
-
-        if (typedTracker != null) {
-            typedTracker.close();
-        }
-        if (untypedTracker != null) {
-            untypedTracker.close();
-        }
-        if (unhandledTracker != null) {
-            unhandledTracker.close();
-        }
-
-        if (monitorReg != null) {
-            safeUnregister(monitorReg);
-        }
-
-        monitorImpl.destroy();
+        eventBus.close();
 
         if (_log.isDebugEnabled()) {
             _log.debug("Aries Typed Event Bus Stopped");
diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index b23df64..5be83f9 100644
--- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -32,7 +32,6 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
 import java.util.stream.Stream;
 
 import org.osgi.framework.Constants;
@@ -84,6 +83,18 @@
      */
     private final Map<Long, List<String>> knownHandlers = new HashMap<>();
 
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}. Values from
+     * the map should be copied as the contents are not thread safe.
+     */
+    private final Map<Long, TypedEventHandler<?>> knownTypedHandlers = new HashMap<>();
+
+    /**
+     * Map access and mutation must be synchronized on {@link #lock}. Values from
+     * the map should be copied as the contents are not thread safe.
+     */
+    private final Map<Long, UntypedEventHandler> knownUntypedHandlers = new HashMap<>();
+
     private final BlockingQueue<EventTask> queue = new LinkedBlockingQueue<>();
 
     /**
@@ -99,47 +110,84 @@
     }
 
     void addTypedEventHandler(TypedEventHandler<?> handler, Map<String, Object> properties) {
-        // TODO try to extract topic name reflectively
-        String defaultTopic = null;
+        Class<?> clazz = discoverTypeForTypedHandler(handler, properties);
+        
+        String defaultTopic = clazz == null ? null : clazz.getName().replace(".", "/");
 
+        doAddEventHandler(topicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic, properties);
+    }
+
+    private Class<?> discoverTypeForTypedHandler(TypedEventHandler<?> handler, Map<String, Object> properties) {
+        Class<?> clazz = null;
         Object type = properties.get(TypedEventConstants.TYPED_EVENT_TYPE);
         if (type != null) {
-            defaultTopic = String.valueOf(type).replace(".", "/");
             try {
-                Class<?> clazz = handler.getClass().getClassLoader().loadClass(String.valueOf(type));
-
-                synchronized (lock) {
-                    typedHandlersToTargetClasses.put(handler, clazz);
-                }
+                 clazz = handler.getClass().getClassLoader().loadClass(String.valueOf(type));
             } catch (ClassNotFoundException e) {
                 // TODO Blow up
                 e.printStackTrace();
             }
         } else {
-            Class<?> clazz = Arrays.stream(handler.getClass().getGenericInterfaces())
-                    .filter(ParameterizedType.class::isInstance).map(ParameterizedType.class::cast)
-                    .filter(t -> TypedEventHandler.class.equals(t.getRawType())).map(t -> t.getActualTypeArguments()[0])
-                    .findFirst().map(Class.class::cast).orElse(null);
-
-            if (clazz != null) {
-                defaultTopic = String.valueOf(type).replace(".", "/");
-                synchronized (lock) {
-                    typedHandlersToTargetClasses.put(handler, clazz);
+            Class<?> toCheck = handler.getClass();
+            outer: while(clazz == null) {
+                clazz = findDirectlyImplemented(toCheck);
+                
+                if(clazz != null) {
+                    break outer;
                 }
-            } else {
-                // TODO Blow Up
+                
+                clazz = processInterfaceHierarchyForClass(toCheck);
+
+                if(clazz != null) {
+                    break outer;
+                }
+                
+                toCheck = toCheck.getSuperclass();
             }
         }
 
-        doAddEventHandler(topicsToTypedHandlers, handler, defaultTopic, properties);
+        if (clazz != null) {
+            synchronized (lock) {
+                typedHandlersToTargetClasses.put(handler, clazz);
+            }
+        } else {
+            // TODO Blow Up
+        }
+        return clazz;
+    }
+
+    private Class<?> processInterfaceHierarchyForClass(Class<?> toCheck) {
+        Class<?> clazz = null;
+        for (Class<?> iface : toCheck.getInterfaces()) {
+            clazz = findDirectlyImplemented(iface);
+            
+            if(clazz != null) {
+                break;
+            }
+            
+            clazz = processInterfaceHierarchyForClass(iface); 
+
+            if(clazz != null) {
+                break;
+            }
+        }
+        return clazz;
+    }
+
+    private Class<?> findDirectlyImplemented(Class<?> toCheck) {
+        return Arrays.stream(toCheck.getGenericInterfaces())
+            .filter(ParameterizedType.class::isInstance)
+            .map(ParameterizedType.class::cast)
+            .filter(t -> TypedEventHandler.class.equals(t.getRawType())).map(t -> t.getActualTypeArguments()[0])
+            .findFirst().map(Class.class::cast).orElse(null);
     }
 
     void addUntypedEventHandler(UntypedEventHandler handler, Map<String, Object> properties) {
-        doAddEventHandler(topicsToUntypedHandlers, handler, null, properties);
+        doAddEventHandler(topicsToUntypedHandlers, knownUntypedHandlers, handler, null, properties);
     }
 
-    private <T> void doAddEventHandler(Map<String, Map<T, Filter>> map, T handler, String defaultTopic,
-            Map<String, Object> properties) {
+    private <T> void doAddEventHandler(Map<String, Map<T, Filter>> map, Map<Long, T> idMap, 
+            T handler, String defaultTopic, Map<String, Object> properties) {
 
         Object prop = properties.get(TypedEventConstants.TYPED_EVENT_TOPICS);
 
@@ -166,17 +214,13 @@
             return;
         }
 
-        doAddToMap(map, handler, x -> f, topicList, serviceId);
-    }
-
-    private <T, U> void doAddToMap(Map<String, Map<T, U>> map, T handler, Function<String, U> valueSupplier,
-            List<String> list, Long serviceId) {
         synchronized (lock) {
-            knownHandlers.put(serviceId, list);
-
-            list.forEach(s -> {
-                Map<T, U> handlers = map.computeIfAbsent(s, x -> new HashMap<>());
-                handlers.put(handler, valueSupplier.apply(s));
+            knownHandlers.put(serviceId, topicList);
+            idMap.put(serviceId, handler);
+        
+            topicList.forEach(s -> {
+                Map<T, Filter> handlers = map.computeIfAbsent(s, x1 -> new HashMap<>());
+                handlers.put(handler, f);
             });
         }
     }
@@ -185,7 +229,7 @@
 
         Long serviceId = getServiceId(properties);
 
-        doRemoveEventHandler(topicsToTypedHandlers, handler, serviceId);
+        doRemoveEventHandler(topicsToTypedHandlers, knownTypedHandlers, handler, serviceId);
 
         synchronized (lock) {
             typedHandlersToTargetClasses.remove(handler);
@@ -196,7 +240,7 @@
 
         Long serviceId = getServiceId(properties);
 
-        doRemoveEventHandler(topicsToUntypedHandlers, handler, serviceId);
+        doRemoveEventHandler(topicsToUntypedHandlers, knownUntypedHandlers, handler, serviceId);
     }
 
     private Long getServiceId(Map<String, Object> properties) {
@@ -221,9 +265,11 @@
         }
     }
 
-    private <T, U> void doRemoveEventHandler(Map<String, Map<T, U>> map, T handler, Long serviceId) {
+    private <T, U> void doRemoveEventHandler(Map<String, Map<T, U>> map, Map<Long, T> idMap, 
+            T handler, Long serviceId) {
         synchronized (lock) {
             List<String> consumed = knownHandlers.remove(serviceId);
+            knownHandlers.remove(serviceId);
             if (consumed != null) {
                 consumed.forEach(s -> {
                     Map<T, ?> handlers = map.get(s);
@@ -238,23 +284,32 @@
         }
     }
 
-    void updatedTypedEventHandler(TypedEventHandler<?> handler, Map<String, Object> properties) {
-        // TODO try to extract topic name reflectively
-        String defaultTopic = null;
-        doUpdatedEventHandler(topicsToTypedHandlers, handler, defaultTopic, properties);
+    void updatedTypedEventHandler(Map<String, Object> properties) {
+        Long serviceId = getServiceId(properties);
+        TypedEventHandler<?> handler;
+        synchronized (lock) {
+            handler = knownTypedHandlers.get(serviceId);
+        }
+        
+        Class<?> clazz = discoverTypeForTypedHandler(handler, properties);
+        
+        String defaultTopic = clazz == null ? null : clazz.getName().replace(".", "/");
+        
+        doUpdatedEventHandler(topicsToTypedHandlers, knownTypedHandlers, defaultTopic, properties);
     }
 
-    void updatedUntypedEventHandler(UntypedEventHandler handler, Map<String, Object> properties) {
-        doUpdatedEventHandler(topicsToUntypedHandlers, handler, null, properties);
+    void updatedUntypedEventHandler(Map<String, Object> properties) {
+        doUpdatedEventHandler(topicsToUntypedHandlers, knownUntypedHandlers, null, properties);
     }
 
-    private <T> void doUpdatedEventHandler(Map<String, Map<T, Filter>> map, T handler, String defaultTopic,
+    private <T> void doUpdatedEventHandler(Map<String, Map<T, Filter>> map, Map<Long,T> idToHandler, String defaultTopic,
             Map<String, Object> properties) {
         Long serviceId = getServiceId(properties);
 
         synchronized (lock) {
-            doRemoveEventHandler(map, handler, serviceId);
-            doAddEventHandler(map, handler, defaultTopic, properties);
+            T handler = idToHandler.get(serviceId);
+            doRemoveEventHandler(map, idToHandler, handler, serviceId);
+            doAddEventHandler(map, idToHandler, handler, defaultTopic, properties);
         }
     }
 
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent.java
new file mode 100644
index 0000000..ab2fee1
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.common;
+
+public class TestEvent {
+    public String message;
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2.java
new file mode 100644
index 0000000..e91c832
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.common;
+
+public class TestEvent2 {
+    public TestEvent subEvent;
+    public EventType eventType;
+    
+    public static TestEvent2 create(TestEvent event) {
+        TestEvent2 event2 = new TestEvent2();
+        event2.subEvent = event;
+        event2.eventType = EventType.RED;
+        return event2;
+    }
+    
+    
+    public static enum EventType {
+        RED, GREEN, BLUE;
+    }
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java
new file mode 100644
index 0000000..cdf8d08
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.common;
+
+import org.osgi.service.typedevent.TypedEventHandler;
+
+public interface TestEvent2Consumer extends TypedEventHandler<TestEvent2> {
+
+}
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEventConsumer.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEventConsumer.java
new file mode 100644
index 0000000..2f78de7
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEventConsumer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.common;
+
+import org.osgi.service.typedevent.TypedEventHandler;
+
+public interface TestEventConsumer extends TypedEventHandler<TestEvent> {
+
+}
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
index 6b8c67f..7b3996b 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
@@ -19,7 +19,8 @@
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.quality.Strictness.LENIENT;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -29,12 +30,10 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.MockitoAnnotations;
 import org.osgi.framework.Constants;
 import org.osgi.service.typedevent.TypedEventConstants;
 import org.osgi.service.typedevent.TypedEventHandler;
@@ -42,10 +41,12 @@
 import org.osgi.service.typedevent.UntypedEventHandler;
 import org.osgi.util.converter.Converters;
 
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = LENIENT)
 public class TypedEventBusImplTest {
 
+    private static final String SPECIAL_TEST_EVENT_TOPIC = SpecialTestEvent.class.getName().replace(".", "/");
+
+    private static final String TEST_EVENT_TOPIC = TestEvent.class.getName().replace(".", "/");
+
     public static class TestEvent {
         public String message;
     }
@@ -53,14 +54,18 @@
     public static class TestEvent2 {
         public int count;
     }
+    
+    public static class SpecialTestEvent extends TestEvent {
+        
+    }
 
-    @Mock
+    @Mock(lenient = true)
     TypedEventHandler<Object> handlerA, handlerB;
 
-    @Mock
+    @Mock(lenient = true)
     UntypedEventHandler untypedHandlerA, untypedHandlerB;
 
-    @Mock
+    @Mock(lenient = true)
     UnhandledEventHandler unhandledHandler;
 
     Semaphore semA = new Semaphore(0), semB = new Semaphore(0), untypedSemA = new Semaphore(0),
@@ -69,9 +74,13 @@
     TypedEventBusImpl impl;
     TypedEventMonitorImpl monitorImpl;
 
+    private AutoCloseable mocks;
+
     @BeforeEach
     public void start() {
 
+        mocks = MockitoAnnotations.openMocks(this);
+        
         Mockito.doAnswer(i -> {
             semA.release();
             return null;
@@ -104,9 +113,10 @@
     }
 
     @AfterEach
-    public void stop() {
+    public void stop() throws Exception {
         impl.stop();
         monitorImpl.destroy();
+        mocks.close();
     }
 
     /**
@@ -122,7 +132,7 @@
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
         serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
         serviceProperties.put(Constants.SERVICE_ID, 42L);
 
@@ -138,7 +148,7 @@
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
         serviceProperties.put(Constants.SERVICE_ID, 44L);
 
         impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
@@ -167,6 +177,73 @@
         assertFalse(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
 
     }
+    
+    public static class TestEventHandler implements TypedEventHandler<TestEvent> {
+
+        @Override
+        public void notify(String topic, TestEvent event) {
+            // No op
+        }
+    }
+
+    public static interface TestEventHandlerIface extends TypedEventHandler<TestEvent> {
+        
+    }
+    
+    /**
+     * Tests that reified typedEventHandlers are properly processed
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testGenericTypeInference() throws InterruptedException {
+        
+        TypedEventHandler<TestEvent> handler = Mockito.spy(TestEventHandler.class);
+        TypedEventHandler<TestEvent> handler2 = Mockito.spy(TestEventHandler.class);
+        TypedEventHandler<TestEvent> handler3 = Mockito.mock(TestEventHandlerIface.class);
+        
+        TestEvent event = new TestEvent();
+        event.message = "boo";
+        
+        Map<String, Object> serviceProperties = new HashMap<>();
+        serviceProperties.put(Constants.SERVICE_ID, 42L);
+        
+        impl.addTypedEventHandler(handler, serviceProperties);
+        
+        serviceProperties = new HashMap<>();
+        
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, SpecialTestEvent.class.getName());
+        serviceProperties.put(Constants.SERVICE_ID, 43L);
+        
+        impl.addTypedEventHandler(handler2, serviceProperties);
+
+        serviceProperties = new HashMap<>();
+        
+        serviceProperties.put(Constants.SERVICE_ID, 44L);
+        
+        impl.addTypedEventHandler(handler3, serviceProperties);
+        
+        impl.deliver(event);
+        
+        Mockito.verify(handler, Mockito.timeout(1000)).notify(eq(TEST_EVENT_TOPIC), argThat(isTestEventWithMessage("boo")));
+        Mockito.verify(handler3, Mockito.timeout(1000)).notify(eq(TEST_EVENT_TOPIC), argThat(isTestEventWithMessage("boo")));
+        
+        Mockito.verify(handler2, Mockito.after(1000).never()).notify(Mockito.anyString(), Mockito.any());
+
+        
+        event = new SpecialTestEvent();
+        event.message = "far";
+        impl.deliver(event);
+        
+        Mockito.verify(handler, Mockito.after(1000).never()).notify(eq(SPECIAL_TEST_EVENT_TOPIC), Mockito.any());
+        Mockito.verify(handler3, Mockito.after(1000).never()).notify(eq(SPECIAL_TEST_EVENT_TOPIC), Mockito.any());
+        
+        Mockito.verify(handler2, Mockito.timeout(1000)).notify(eq(SPECIAL_TEST_EVENT_TOPIC), 
+                argThat(isSpecialTestEventWithMessage("far")));
+        
+        
+        
+    }
 
     /**
      * Tests that events are delivered to Smart Behaviours based on type
@@ -237,7 +314,7 @@
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
         serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
         serviceProperties.put("event.filter", "(message=foo)");
         serviceProperties.put(Constants.SERVICE_ID, 42L);
@@ -246,7 +323,7 @@
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
         serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
         serviceProperties.put("event.filter", "(message=bar)");
         serviceProperties.put(Constants.SERVICE_ID, 43L);
@@ -255,7 +332,7 @@
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
         serviceProperties.put("event.filter", "(message=foo)");
         serviceProperties.put(Constants.SERVICE_ID, 44L);
 
@@ -263,7 +340,7 @@
 
         serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
         serviceProperties.put("event.filter", "(message=bar)");
         serviceProperties.put(Constants.SERVICE_ID, 45L);
 
@@ -318,7 +395,7 @@
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
         serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
         serviceProperties.put("event.filter", "");
         serviceProperties.put(Constants.SERVICE_ID, 42L);
@@ -343,7 +420,7 @@
 
         Map<String, Object> serviceProperties = new HashMap<>();
 
-        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+        serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
         serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
         serviceProperties.put("event.filter", "(message=foo)");
         serviceProperties.put(Constants.SERVICE_ID, 42L);
@@ -382,15 +459,25 @@
 
     }
 
-    ArgumentMatcher<Object> isTestEventWithMessage(String message) {
-        return new ArgumentMatcher<Object>() {
+    ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) {
+        return new ArgumentMatcher<TestEvent>() {
 
             @Override
-            public boolean matches(Object argument) {
+            public boolean matches(TestEvent argument) {
                 return argument instanceof TestEvent && message.equals(((TestEvent) argument).message);
             }
         };
     }
+    
+    ArgumentMatcher<SpecialTestEvent> isSpecialTestEventWithMessage(String message) {
+        return new ArgumentMatcher<SpecialTestEvent>() {
+            
+            @Override
+            public boolean matches(SpecialTestEvent argument) {
+                return argument instanceof SpecialTestEvent && message.equals(((SpecialTestEvent) argument).message);
+            }
+        };
+    }
 
     ArgumentMatcher<Map<String, Object>> isUntypedTestEventWithMessage(String message) {
         return new ArgumentMatcher<Map<String, Object>>() {
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/AbstractIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/AbstractIntegrationTest.java
new file mode 100644
index 0000000..81cd6d2
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/AbstractIntegrationTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEvent2;
+import org.junit.jupiter.api.AfterEach;
+import org.mockito.ArgumentMatcher;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ * 
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+public abstract class AbstractIntegrationTest {
+    
+    protected static final String TEST_EVENT_TOPIC = TestEvent.class.getName().replace(".", "/");
+
+    protected static final String TEST_EVENT_2_TOPIC = TestEvent2.class.getName().replace(".", "/");
+    
+    protected final List<ServiceRegistration<?>> regs = new ArrayList<ServiceRegistration<?>>();
+    
+    @AfterEach
+    public void tearDown() throws Exception {
+        regs.forEach(sr -> {
+            try {
+                sr.unregister();
+            } catch (Exception e) { }
+        });
+    }
+    
+    protected ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) {
+        return new ArgumentMatcher<TestEvent>() {
+            
+            @Override
+            public boolean matches(TestEvent argument) {
+                return message.equals(argument.message);
+            }
+        };
+    }
+
+    protected ArgumentMatcher<TestEvent2> isTestEvent2WithMessage(String message) {
+        return new ArgumentMatcher<TestEvent2>() {
+            
+            @Override
+            public boolean matches(TestEvent2 argument) {
+                return message.equals(argument.subEvent.message);
+            }
+        };
+    }
+    
+    protected ArgumentMatcher<Map<String, Object>> isUntypedTestEventWithMessage(String message) {
+        return new ArgumentMatcher<Map<String, Object>>() {
+            
+            @Override
+            public boolean matches(Map<String, Object> argument) {
+                return argument != null && message.equals(argument.get("message"));
+            }
+        };
+    }
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java
new file mode 100644
index 0000000..40543fa
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEvent2;
+import org.apache.aries.typedevent.bus.common.TestEvent2.EventType;
+import org.apache.aries.typedevent.bus.common.TestEvent2Consumer;
+import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventConstants;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ * 
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class EventDeliveryIntegrationTest extends AbstractIntegrationTest {
+    
+    @InjectBundleContext
+    BundleContext context;
+    
+    @InjectService
+    TypedEventBus eventBus;
+    
+    @Mock
+    TestEventConsumer typedEventHandler;
+
+    @Mock
+    TestEvent2Consumer typedEventHandler2;
+
+    @Mock
+    UntypedEventHandler untypedEventHandler, untypedEventHandler2;
+
+    private AutoCloseable mocks;
+    
+    @BeforeEach
+    public void setupMocks() {
+        mocks = MockitoAnnotations.openMocks(this);
+    }
+    
+    @AfterEach
+    public void stop() throws Exception {
+        mocks.close();
+    }
+    
+    /**
+     * Tests that events are delivered to untyped Event Handlers
+     * based on topic
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testEventReceiving() throws InterruptedException {
+        
+        TestEvent event = new TestEvent();
+        event.message = "boo";
+        
+        Dictionary<String, Object> props = new Hashtable<>();
+        
+        regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+
+        regs.add(context.registerService(TypedEventHandler.class, typedEventHandler2, props));
+        
+        eventBus.deliver(event);
+        
+        Mockito.verify(typedEventHandler, Mockito.timeout(1000)).notify(
+                Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("boo")));
+
+        Mockito.verify(typedEventHandler2, Mockito.after(1000).never()).notify(
+                Mockito.eq(TEST_EVENT_TOPIC), Mockito.any());
+    }
+
+    /**
+     * Tests that events are delivered to untyped Event Handlers
+     * based on topic
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testEventReceivingUntyped() throws InterruptedException {
+        
+        TestEvent event = new TestEvent();
+        event.message = "boo";
+        
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+        
+        regs.add(context.registerService(UntypedEventHandler.class, untypedEventHandler, props));
+        
+        props = new Hashtable<>();
+        
+        props.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_2_TOPIC);
+        
+        regs.add(context.registerService(UntypedEventHandler.class, untypedEventHandler2, props));
+        
+        
+        eventBus.deliver(event);
+        
+        Mockito.verify(untypedEventHandler, Mockito.timeout(1000)).notifyUntyped(
+                Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+
+        Mockito.verify(untypedEventHandler2, Mockito.after(1000).never()).notifyUntyped(
+                Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+        
+    }
+    
+    @Test
+    public void testSendComplexEvent() throws Exception {
+        Dictionary<String, Object> props = new Hashtable<>();
+        
+        regs.add(context.registerService(TypedEventHandler.class, typedEventHandler2, props));
+        
+        TestEvent event = new TestEvent();
+        event.message = "foo";
+        
+        TestEvent2 event2 = TestEvent2.create(event);
+        
+        eventBus.deliver(event2);
+        
+        
+        Mockito.verify(typedEventHandler2, Mockito.timeout(1000))
+            .notify(Mockito.eq(TEST_EVENT_2_TOPIC), Mockito.argThat(isTestEvent2WithMessage("foo")));
+    }
+
+    @Test
+    public void testSendComplexEventToUntypedReceiver() throws Exception {
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_2_TOPIC);
+        
+        regs.add(context.registerService(UntypedEventHandler.class, 
+                untypedEventHandler, props));
+        
+        TestEvent event = new TestEvent();
+        event.message = "foo";
+        
+        TestEvent2 event2 = TestEvent2.create(event);
+        
+        eventBus.deliver(event2);
+        
+        @SuppressWarnings("unchecked")
+        ArgumentCaptor<Map<String, Object>> captor = ArgumentCaptor.forClass(Map.class);
+        
+        Mockito.verify(untypedEventHandler, Mockito.timeout(1000))
+            .notifyUntyped(eq(TEST_EVENT_2_TOPIC), captor.capture());
+        
+        Map<String, Object> map = captor.getValue();
+        
+        // Should be a String not an enum as we can't see the types
+        assertEquals("RED", map.get("eventType"));
+        @SuppressWarnings("unchecked")
+        Map<String, Object> subMap = (Map<String, Object>) map.get("subEvent");
+        
+        assertEquals("foo", subMap.get("message"));
+    }
+    
+    @Test
+    public void testSendComplexUntypedEventToTypedReceiver() throws Exception {
+        Dictionary<String, Object> props = new Hashtable<>();
+        
+        regs.add(context.registerService(TypedEventHandler.class, 
+                typedEventHandler2, props));
+        
+        Map<String, Object> event = new HashMap<>();
+        event.put("message", "foo");
+        
+        Map<String, Object> event2 = new HashMap<>();
+        event2.put("subEvent", event);
+        event2.put("eventType", "BLUE");
+        
+        eventBus.deliver(TEST_EVENT_2_TOPIC, event2);
+        
+        ArgumentCaptor<TestEvent2> captor = ArgumentCaptor.forClass(TestEvent2.class);
+        
+        Mockito.verify(typedEventHandler2, Mockito.timeout(1000))
+            .notify(eq(TEST_EVENT_2_TOPIC), captor.capture());
+        
+        TestEvent2 received = captor.getValue();
+        
+        // Should be a String not an enum as we can't see the types
+        assertEquals(EventType.BLUE, received.eventType);
+        
+        assertEquals("foo", received.subEvent.message);
+    }
+    
+    
+    
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
new file mode 100644
index 0000000..d4dcc78
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or eventBusied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ * 
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class FilterIntegrationTest extends AbstractIntegrationTest {
+    
+    @InjectBundleContext
+    BundleContext context;
+    
+    @InjectService
+    TypedEventBus eventBus;
+    
+    @Mock
+    TestEventConsumer typedEventHandler, typedEventHandlerB;
+
+    private AutoCloseable mocks;
+    
+    @BeforeEach
+    public void setupMocks() {
+        mocks = MockitoAnnotations.openMocks(this);
+    }
+    
+    @AfterEach
+    public void stop() throws Exception {
+        mocks.close();
+    }
+    
+    @Test
+    public void testFilteredListener() throws Exception {
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put("event.filter", "(message=foo)");
+        
+        regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+        
+        props = new Hashtable<>();
+        props.put("event.filter", "(message=bar)");
+        
+        regs.add(context.registerService(TypedEventHandler.class, typedEventHandlerB, props));
+        
+        TestEvent event = new TestEvent();
+        event.message = "foo";
+        
+        eventBus.deliver(event);
+        
+        Mockito.verify(typedEventHandler, Mockito.timeout(1000))
+            .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("foo")));
+
+        Mockito.verify(typedEventHandlerB, Mockito.after(1000).never())
+            .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("foo")));
+        
+        
+        event = new TestEvent();
+        event.message = "bar";
+        
+        eventBus.deliver(event);
+        
+        Mockito.verify(typedEventHandlerB, Mockito.timeout(1000))
+            .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("bar")));
+
+        Mockito.verify(typedEventHandler, Mockito.after(1000).never())
+            .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("bar")));
+    }
+
+    @Test
+    public void testFilteredListenerEmptyString() throws Exception {
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put("event.filter", "");
+        
+        
+        regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+        
+        TestEvent event = new TestEvent();
+        event.message = "foo";
+        
+        eventBus.deliver(event);
+        
+        Mockito.verify(typedEventHandler, Mockito.timeout(1000))
+            .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("foo")));
+    }
+    
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
new file mode 100644
index 0000000..3c6d5c8
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or eventBusied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.monitor.MonitorEvent;
+import org.osgi.service.typedevent.monitor.TypedEventMonitor;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+import org.osgi.util.promise.Promise;
+
+
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class TypedEventMonitorIntegrationTest extends AbstractIntegrationTest {
+
+    TypedEventMonitor monitor;
+    
+    TypedEventBus eventBus;
+    
+    @Mock
+    TestEventConsumer typedEventHandler;
+
+    private AutoCloseable mocks;
+
+    private static Bundle eventBusBundle;
+    
+    @BeforeAll
+    public static void clearInitialHistory(@InjectBundleContext BundleContext ctx) throws Exception {
+        eventBusBundle = getBusBundle(ctx);
+    
+        eventBusBundle.stop();
+        eventBusBundle.start();
+    }
+    
+    private static Bundle getBusBundle(BundleContext ctx) {
+        return Arrays.stream(ctx.getBundles())
+                .filter(b -> "org.apache.aries.typedevent.bus".equals(b.getSymbolicName()))
+                .findAny().orElse(null);
+    }
+
+    @BeforeEach
+    public void setupMocks() {
+        mocks = MockitoAnnotations.openMocks(this);
+    }
+
+    /**
+     * Inject services every time as we restart the eventBus after each test
+     * @param monitor
+     * @param bus
+     */
+    @BeforeEach
+    public void setupMocks(@InjectService TypedEventMonitor monitor, @InjectService TypedEventBus bus) {
+        this.monitor = monitor;
+        this.eventBus = bus;
+    }
+    
+    @AfterEach
+    public void stop() throws Exception {
+        mocks.close();
+
+        // Needed to clear history from previous tests
+        eventBusBundle.stop();
+        eventBusBundle.start();
+    }
+
+    /**
+     * Tests that events are delivered to the monitor
+     *
+     * @throws InterruptedException
+     * @throws InvocationTargetException
+     */
+    @Test
+    public void testTypedEventMonitor1() throws InterruptedException, InvocationTargetException {
+
+        Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents()
+                .limit(2)
+                .collect(Collectors.toList());
+
+        TestEvent event = new TestEvent();
+        event.message = "boo";
+
+        Dictionary<String, Object> props = new Hashtable<>();
+
+        regs.add(eventBusBundle.getBundleContext().registerService(TypedEventHandler.class, typedEventHandler, props));
+
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "bam";
+
+        eventBus.deliver(event);
+
+
+        Mockito.verify(typedEventHandler, Mockito.timeout(2000)).notify(
+                Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("boo")));
+        Mockito.verify(typedEventHandler, Mockito.timeout(2000)).notify(
+                Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("bam")));
+
+        List<MonitorEvent> events = eventsPromise.timeout(100).getValue();
+
+        assertEquals(2, events.size());
+
+        assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+        assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+        assertEquals("boo", events.get(0).eventData.get("message"));
+        assertEquals("bam", events.get(1).eventData.get("message"));
+
+
+    }
+
+
+    /**
+     * Tests that events are delivered to the monitor even when nobody is listening
+     *
+     * @throws InterruptedException
+     * @throws InvocationTargetException
+     */
+    @Test
+    public void testTypedEventMonitor2() throws InterruptedException, InvocationTargetException {
+
+        Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents()
+                .limit(2)
+                .collect(Collectors.toList());
+
+        TestEvent event = new TestEvent();
+        event.message = "boo";
+
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "bam";
+
+        eventBus.deliver(event);
+
+        List<MonitorEvent> events = eventsPromise.timeout(2000).getValue();
+
+        assertEquals(2, events.size());
+
+        assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+        assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+        assertEquals("boo", events.get(0).eventData.get("message"));
+        assertEquals("bam", events.get(1).eventData.get("message"));
+
+
+    }
+
+    /**
+     * Tests that event history is delivered to the monitor
+     *
+     * @throws InterruptedException
+     * @throws InvocationTargetException
+     */
+    @Test
+    public void testTypedEventMonitorHistory1() throws InterruptedException, InvocationTargetException {
+
+        TestEvent event = new TestEvent();
+        event.message = "boo";
+
+        eventBus.deliver(event);
+
+        event = new TestEvent();
+        event.message = "bam";
+
+        eventBus.deliver(event);
+
+        Thread.sleep(500);
+
+        Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents()
+                .limit(Duration.ofSeconds(1))
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        List<MonitorEvent> events = eventsPromise.getValue();
+
+        assertTrue(events.isEmpty());
+
+        eventsPromise = monitor.monitorEvents(5)
+                .limit(Duration.ofSeconds(1))
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        events = eventsPromise.getValue();
+
+        assertEquals(2, events.size(), events.toString());
+
+        assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+        assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+        assertEquals("boo", events.get(0).eventData.get("message"));
+        assertEquals("bam", events.get(1).eventData.get("message"));
+
+        eventsPromise = monitor.monitorEvents(1)
+                .limit(Duration.ofSeconds(1))
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        events = eventsPromise.getValue();
+
+        assertEquals(1, events.size());
+
+        assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+
+        assertEquals("bam", events.get(0).eventData.get("message"));
+
+
+    }
+
+    /**
+     * Tests that event history is delivered to the monitor
+     *
+     * @throws InterruptedException
+     * @throws InvocationTargetException
+     */
+    @Test
+    public void testTypedEventMonitorHistory2() throws InterruptedException, InvocationTargetException {
+
+        Instant beforeFirst = Instant.now().minus(Duration.ofMillis(500));
+
+        TestEvent event = new TestEvent();
+        event.message = "boo";
+
+        eventBus.deliver(event);
+
+        Instant afterFirst = Instant.now().plus(Duration.ofMillis(500));
+
+        Thread.sleep(1000);
+
+        event = new TestEvent();
+        event.message = "bam";
+
+        eventBus.deliver(event);
+
+        Instant afterSecond = Instant.now().plus(Duration.ofMillis(500));
+
+        Thread.sleep(500);
+
+        Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents()
+                .limit(Duration.ofSeconds(1))
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+
+        List<MonitorEvent> events = eventsPromise.getValue();
+
+        assertTrue(events.isEmpty());
+
+        eventsPromise = monitor.monitorEvents(beforeFirst)
+                .limit(Duration.ofSeconds(1))
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        events = eventsPromise.getValue();
+
+        assertEquals(2, events.size());
+
+        assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+        assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+        assertEquals("boo", events.get(0).eventData.get("message"));
+        assertEquals("bam", events.get(1).eventData.get("message"));
+
+        eventsPromise = monitor.monitorEvents(afterFirst)
+                .limit(Duration.ofSeconds(1))
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        events = eventsPromise.getValue();
+
+        assertEquals(1, events.size());
+
+        assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+
+        assertEquals("bam", events.get(0).eventData.get("message"));
+
+        eventsPromise = monitor.monitorEvents(afterSecond)
+                .limit(Duration.ofSeconds(1))
+                .collect(Collectors.toList())
+                .timeout(2000);
+
+        events = eventsPromise.getValue();
+
+        assertTrue(events.isEmpty());
+    }
+
+}
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
new file mode 100644
index 0000000..c781b62
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.UnhandledEventHandler;
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ * 
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class UnhandledEventHandlerIntegrationTest extends AbstractIntegrationTest {
+    
+    @InjectBundleContext
+    BundleContext context;
+    
+    @InjectService
+    TypedEventBus eventBus;
+    
+    @Mock
+    TestEventConsumer typedEventHandler;
+
+    @Mock
+    UntypedEventHandler untypedEventHandler;
+
+    @Mock
+    UnhandledEventHandler unhandledEventHandler;
+
+    private AutoCloseable mocks;
+    
+    @BeforeEach
+    public void setupMocks() {
+        mocks = MockitoAnnotations.openMocks(this);
+    }
+    
+    @AfterEach
+    public void stop() throws Exception {
+        mocks.close();
+    }
+    
+    /**
+     * Tests that the unhandledEventHandler gets called appropriately
+     * @throws InterruptedException
+     */
+    @Test
+    public void testUnhandledDueToTopic() throws InterruptedException {
+        
+        Dictionary<String, Object> props = new Hashtable<>();
+        
+        regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+        
+        props = new Hashtable<>();
+        
+        regs.add(context.registerService(UnhandledEventHandler.class, unhandledEventHandler, props));
+        
+        
+        TestEvent event = new TestEvent();
+        event.message = "foo";
+        
+        eventBus.deliver(event);
+        
+        verify(typedEventHandler, timeout(1000)).notify(eq(TEST_EVENT_TOPIC), 
+                argThat(isTestEventWithMessage("foo")));
+        
+        verify(unhandledEventHandler, after(1000).never()).notifyUnhandled(anyString(), anyMap());
+        
+        
+        eventBus.deliver("anotherTopic", event);
+        
+        verify(typedEventHandler, after(1000).never()).notify(eq("anotherTopic"), any());
+        
+        verify(unhandledEventHandler, timeout(1000)).notifyUnhandled(eq("anotherTopic"), 
+                argThat(isUntypedTestEventWithMessage("foo")));
+
+    }
+
+    /**
+     * Tests that the consumer of last resort gets called appropriately
+     * @throws InterruptedException
+     */
+    @Test
+    public void testUnhandledDueToFilter() throws InterruptedException {
+        
+        Dictionary<String, Object> props = new Hashtable<>();
+        props.put("event.filter", "(message=foo)");
+        
+        regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+        
+        props = new Hashtable<>();
+        
+        regs.add(context.registerService(UnhandledEventHandler.class, unhandledEventHandler, props));
+        
+        
+        TestEvent event = new TestEvent();
+        event.message = "foo";
+        
+        eventBus.deliver(event);
+        
+        verify(typedEventHandler, timeout(1000)).notify(eq(TEST_EVENT_TOPIC), 
+                argThat(isTestEventWithMessage("foo")));
+        
+        verify(unhandledEventHandler, after(1000).never()).notifyUnhandled(anyString(), anyMap());
+        
+        
+        event = new TestEvent();
+        event.message = "bar";
+        
+        
+        eventBus.deliver(event);
+        
+        verify(typedEventHandler, after(1000).never()).notify(eq(TEST_EVENT_TOPIC), 
+                argThat(isTestEventWithMessage("bar")));
+        
+        verify(unhandledEventHandler, timeout(1000)).notifyUnhandled(eq(TEST_EVENT_TOPIC), 
+                argThat(isUntypedTestEventWithMessage("bar")));
+        
+    }
+    
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/test.bndrun b/org.apache.aries.typedevent.bus/test.bndrun
new file mode 100644
index 0000000..6735898
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/test.bndrun
@@ -0,0 +1,53 @@
+#    Licensed to the Apache Software Foundation (ASF) under one
+#    or more contributor license agreements.  See the NOTICE file
+#    distributed with this work for additional information
+#    regarding copyright ownership.  The ASF licenses this file
+#    to you under the Apache License, Version 2.0 (the
+#    "License"); you may not use this file except in compliance
+#    with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing,
+#    software distributed under the License is distributed on an
+#    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#    KIND, either express or implied.  See the License for the
+#    specific language governing permissions and limitations
+#    under the License.
+
+-tester: biz.aQute.tester.junit-platform
+
+-runfw: org.apache.felix.framework
+
+-runrequires: bnd.identity;id="org.apache.aries.typedevent.bus-tests",\
+  bnd.identity;id="junit-jupiter-engine",\
+  bnd.identity;id="junit-platform-launcher"
+ 
+-runsystempackages: sun.reflect
+
+-resolve.effective: active
+-runbundles: \
+    ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
+    ch.qos.logback.core;version='[1.2.3,1.2.4)',\
+    org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
+    org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
+    org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
+    org.apache.felix.converter;version='[1.0.14,1.0.15)',\
+    org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
+    org.osgi.util.function;version='[1.1.0,1.1.1)',\
+    org.osgi.util.promise;version='[1.1.1,1.1.2)',\
+    org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
+    slf4j.api;version='[1.7.30,1.7.31)',\
+    junit-jupiter-api;version='[5.6.2,5.6.3)',\
+    junit-platform-commons;version='[1.6.2,1.6.3)',\
+    net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
+    net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
+    org.apache.aries.typedevent.bus-tests;version='[0.0.1,0.0.2)',\
+    org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
+    org.objenesis;version='[3.1.0,3.1.1)',\
+    org.opentest4j;version='[1.2.0,1.2.1)',\
+    org.osgi.test.common;version='[0.9.0,0.9.1)',\
+    org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
+    junit-platform-engine;version='[1.6.2,1.6.3)',\
+    junit-platform-launcher;version='[1.6.2,1.6.3)',\
+    junit-jupiter-engine;version='[5.6.2,5.6.3)'
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 9ccbbbe..dfb1c98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
     </repositories>
 
     <properties>
-        <bnd.version>5.1.0</bnd.version>
+        <bnd.version>5.1.2</bnd.version>
         <dsl.version>1.2.2</dsl.version>
         <junit.version>5.6.2</junit.version>
         <mockito.version>3.5.10</mockito.version>
@@ -113,25 +113,7 @@
             <dependency>
                 <groupId>org.slf4j</groupId>
                 <artifactId>slf4j-api</artifactId>
-                <version>1.7.0</version>
-            </dependency>
-            <dependency>
-                <groupId>org.junit.jupiter</groupId>
-                <artifactId>junit-jupiter</artifactId>
-                <version>${junit.version}</version>
-                <scope>test</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.mockito</groupId>
-                <artifactId>mockito-core</artifactId>
-                <version>${mockito.version}</version>
-                <scope>test</scope>
-            </dependency>
-            <dependency>
-                <groupId>org.mockito</groupId>
-                <artifactId>mockito-junit-jupiter</artifactId>
-                <version>${mockito.version}</version>
-                <scope>test</scope>
+                <version>1.7.30</version>
             </dependency>
         </dependencies>
     </dependencyManagement>
@@ -140,6 +122,26 @@
         <pluginManagement>
             <plugins>
                 <plugin>
+                    <groupId>org.codehaus.mojo</groupId>
+                    <artifactId>build-helper-maven-plugin</artifactId>
+                    <version>3.2.0</version>
+                    <executions>
+                        <execution>
+                            <id>regex-property</id>
+                            <goals>
+                                <goal>regex-property</goal>
+                            </goals>
+                            <configuration>
+                                <name>path.from.artifactId</name>
+                                <value>${project.artifactId}</value>
+                                <regex>\.</regex>
+                                <replacement>/</replacement>
+                                <failIfNoMatch>false</failIfNoMatch>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-compiler-plugin</artifactId>
                     <version>3.8.1</version>
@@ -157,6 +159,19 @@
                             <manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
                         </archive>
                     </configuration>
+                    <executions>
+                        <execution>
+                            <id>test-jar</id>
+                            <goals>
+                                <goal>test-jar</goal>
+                            </goals>
+                            <configuration>
+                                <archive>
+                                    <manifestFile>${project.build.testOutputDirectory}/META-INF/MANIFEST.MF</manifestFile>
+                                </archive>
+                            </configuration>
+                        </execution>
+                    </executions>
                 </plugin>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
@@ -172,6 +187,9 @@
                     <version>2.22.2</version>
                     <configuration>
                         <source>1.8</source>
+                        <excludes>
+                            <exclude>${path.from.artifactId}/osgi/**</exclude>
+                        </excludes>
                     </configuration>
                 </plugin>
                 <plugin>
@@ -185,19 +203,124 @@
                                 <goal>bnd-process</goal>
                             </goals>
                         </execution>
+                        <execution>
+                            <id>test-bnd-process</id>
+                            <goals>
+                                <goal>bnd-process-tests</goal>
+                            </goals>
+                            <configuration>
+                                <artifactFragment>true</artifactFragment>
+                                <testCases>useTestCasesHeader</testCases>
+                                <bnd><![CDATA[
+                                    Test-Cases:${select;${classes;HIERARCHY_INDIRECTLY_ANNOTATED;org.junit.platform.commons.annotation.Testable;CONCRETE};${project.artifactId}.osgi.*}
+                                ]]></bnd>
+                            </configuration>
+                        </execution>
                     </executions>
                 </plugin>
+                <plugin>
+                    <groupId>biz.aQute.bnd</groupId>
+                    <artifactId>bnd-resolver-maven-plugin</artifactId>
+                    <version>${bnd.version}</version>
+                    <executions>
+                        <execution>
+                            <id>resolve-test</id>
+                            <phase>pre-integration-test</phase>
+                            <goals>
+                                <goal>resolve</goal>
+                            </goals>
+                            <configuration>
+                                <bndruns>
+                                    <bndrun>test.bndrun</bndrun>
+                                </bndruns>
+                                <bundles>
+                                    <bundle>${project.build.directory}/${project.build.finalName}-tests.jar</bundle>
+                                </bundles>
+                                <failOnChanges>false</failOnChanges>
+                                <includeDependencyManagement>true</includeDependencyManagement>
+                                <reportOptional>false</reportOptional>
+                                <scopes>
+                                    <scope>compile</scope>
+                                    <scope>runtime</scope>
+                                    <scope>test</scope>
+                                </scopes>
+                            </configuration>
+                        </execution>
+                        <execution>
+                            <id>resolve-run</id>
+                            <goals>
+                                <goal>resolve</goal>
+                            </goals>
+                            <configuration>
+                                <bndruns>
+                                    <bndrun>run.bndrun</bndrun>
+                                </bndruns>
+                                <failOnChanges>false</failOnChanges>
+                                <includeDependencyManagement>true</includeDependencyManagement>
+                                <reportOptional>false</reportOptional>
+                                <scopes>
+                                    <scope>compile</scope>
+                                    <scope>runtime</scope>
+                                    <scope>test</scope>
+                                </scopes>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>biz.aQute.bnd</groupId>
+                    <artifactId>bnd-testing-maven-plugin</artifactId>
+                    <version>${bnd.version}</version>
+                    <executions>
+                        <execution>
+                            <id>testing</id>
+                            <goals>
+                                <goal>testing</goal>
+                            </goals>
+                            <configuration>
+                                <bndruns>
+                                    <bndrun>test.bndrun</bndrun>
+                                </bndruns>
+                                <bundles>
+                                    <bundle>${project.build.directory}/${project.build.finalName}-tests.jar</bundle>
+                                </bundles>
+                                <failOnChanges>false</failOnChanges>
+                                <includeDependencyManagement>true</includeDependencyManagement>
+                                <resolve>false</resolve>
+                                <scopes>
+                                    <scope>compile</scope>
+                                    <scope>runtime</scope>
+                                    <scope>test</scope>
+                                </scopes>
+                            </configuration>
+                        </execution>
+                    </executions>
+                </plugin>
+                <plugin>
+                    <groupId>biz.aQute.bnd</groupId>
+                    <artifactId>bnd-run-maven-plugin</artifactId>
+                    <version>${bnd.version}</version>
+                    <configuration>
+                        <bndrun>run.bndrun</bndrun>
+                        <includeDependencyManagement>true</includeDependencyManagement>
+                        <scopes>
+                            <scope>compile</scope>
+                            <scope>runtime</scope>
+                            <scope>test</scope>
+                        </scopes>
+                    </configuration>
+                </plugin>
             </plugins>
         </pluginManagement>
-
         <plugins>
             <plugin>
-                <groupId>biz.aQute.bnd</groupId>
-                <artifactId>bnd-maven-plugin</artifactId>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
             </plugin>
         </plugins>
     </build>
     <modules>
+        <module>typedevent-test-bom</module>
         <module>org.apache.aries.typedevent.bus</module>
     </modules>
 </project>
diff --git a/typedevent-test-bom/pom.xml b/typedevent-test-bom/pom.xml
new file mode 100644
index 0000000..e3c703e
--- /dev/null
+++ b/typedevent-test-bom/pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+    license agreements. See the NOTICE file distributed with this work for additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    you under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.aries.typedevent</groupId>
+        <artifactId>typedevent-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>typedevent-test-bom</artifactId>
+    <packaging>pom</packaging>
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.junit</groupId>
+                <artifactId>junit-bom</artifactId>
+                <version>${junit.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.mockito</groupId>
+                <artifactId>mockito-core</artifactId>
+                <version>${mockito.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.mockito</groupId>
+                <artifactId>mockito-junit-jupiter</artifactId>
+                <version>${mockito.version}</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.osgi</groupId>
+                <artifactId>org.osgi.test.junit5</artifactId>
+                <version>0.9.0</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>org.apache.felix.framework</artifactId>
+                <version>6.0.3</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.osgi</groupId>
+                <artifactId>org.osgi.service.cm</artifactId>
+                <version>1.6.0</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>org.apache.felix.configadmin</artifactId>
+                <version>1.9.18</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-classic</artifactId>
+                <version>1.2.3</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>ch.qos.logback</groupId>
+                <artifactId>logback-core</artifactId>
+                <version>1.2.3</version>
+                <scope>test</scope>
+            </dependency>
+            
+            
+            <!-- The Web Console -->
+            <dependency>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>org.apache.felix.webconsole</artifactId>
+                <version>4.3.4</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>org.apache.felix.webconsole.plugins.ds</artifactId>
+                <version>2.0.8</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>org.apache.felix.inventory</artifactId>
+                <version>1.0.4</version>
+                <scope>test</scope>
+            </dependency>
+
+            <!-- The Gogo Shell -->
+            <dependency>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>org.apache.felix.gogo.shell</artifactId>
+                <version>1.0.0</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>org.apache.felix.gogo.runtime</artifactId>
+                <version>1.0.10</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.felix</groupId>
+                <artifactId>org.apache.felix.gogo.command</artifactId>
+                <version>1.0.2</version>
+                <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.osgi</groupId>
+                        <artifactId>org.osgi.core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.osgi</groupId>
+                        <artifactId>org.osgi.compendium</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>
\ No newline at end of file