Use the Component DSL to run the whiteboard, contribute integration tests from the BRAIN-IoT prototype
diff --git a/org.apache.aries.typedevent.bus/pom.xml b/org.apache.aries.typedevent.bus/pom.xml
index db237cf..5c38b68 100644
--- a/org.apache.aries.typedevent.bus/pom.xml
+++ b/org.apache.aries.typedevent.bus/pom.xml
@@ -22,6 +22,18 @@
<artifactId>org.apache.aries.typedevent.bus</artifactId>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.aries.typedevent</groupId>
+ <artifactId>typedevent-test-bom</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
<dependencies>
<dependency>
<groupId>org.osgi</groupId>
@@ -67,5 +79,38 @@
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.test.junit5</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-resolver-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-testing-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-run-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/run.bndrun b/org.apache.aries.typedevent.bus/run.bndrun
new file mode 100644
index 0000000..91345b1
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/run.bndrun
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+-tester: biz.aQute.tester.junit-platform
+
+-runfw: org.apache.felix.framework
+
+-runrequires: bnd.identity;id="org.apache.aries.typedevent.bus",\
+ bnd.identity;id="org.apache.felix.gogo.shell",\
+ bnd.identity;id="org.apache.felix.gogo.runtime",\
+ bnd.identity;id="org.apache.felix.gogo.command"
+
+
+-resolve.effective: active
+-runbundles: \
+ ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
+ ch.qos.logback.core;version='[1.2.3,1.2.4)',\
+ org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
+ org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
+ org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
+ org.apache.felix.converter;version='[1.0.14,1.0.15)',\
+ org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
+ org.osgi.util.function;version='[1.1.0,1.1.1)',\
+ org.osgi.util.promise;version='[1.1.1,1.1.2)',\
+ org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
+ slf4j.api;version='[1.7.30,1.7.31)',\
+ org.apache.felix.gogo.command;version='[1.0.2,1.0.3)',\
+ org.apache.felix.gogo.runtime;version='[1.0.10,1.0.11)',\
+ org.apache.felix.gogo.shell;version='[1.0.0,1.0.1)'
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
index af61b3f..06bc1d3 100644
--- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
+++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusActivator.java
@@ -18,28 +18,36 @@
package org.apache.aries.typedevent.bus.impl;
import static java.util.function.Function.identity;
+import static org.apache.aries.component.dsl.OSGi.all;
+import static org.apache.aries.component.dsl.OSGi.coalesce;
+import static org.apache.aries.component.dsl.OSGi.configuration;
+import static org.apache.aries.component.dsl.OSGi.nothing;
+import static org.apache.aries.component.dsl.OSGi.just;
+import static org.apache.aries.component.dsl.OSGi.register;
+import static org.apache.aries.component.dsl.OSGi.service;
+import static org.apache.aries.component.dsl.OSGi.serviceReferences;
import java.util.Arrays;
import java.util.Dictionary;
+import java.util.Enumeration;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
+import org.apache.aries.component.dsl.OSGi;
+import org.apache.aries.component.dsl.OSGiResult;
import org.osgi.annotation.bundle.Header;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceReference;
-import org.osgi.framework.ServiceRegistration;
import org.osgi.service.typedevent.TypedEventBus;
import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.service.typedevent.UnhandledEventHandler;
import org.osgi.service.typedevent.UntypedEventHandler;
import org.osgi.service.typedevent.monitor.TypedEventMonitor;
-import org.osgi.util.tracker.ServiceTracker;
-import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,15 +56,7 @@
private static final Logger _log = LoggerFactory.getLogger(TypedEventBusActivator.class);
- private TypedEventMonitorImpl monitorImpl;
- private ServiceRegistration<TypedEventMonitor> monitorReg;
-
- private TypedEventBusImpl busImpl;
- private ServiceRegistration<TypedEventBus> busReg;
-
- private ServiceTracker<TypedEventHandler<?>, TypedEventHandler<?>> typedTracker;
- private ServiceTracker<UntypedEventHandler, UntypedEventHandler> untypedTracker;
- private ServiceTracker<UnhandledEventHandler, UnhandledEventHandler> unhandledTracker;
+ OSGiResult eventBus;
@Override
public void start(BundleContext bundleContext) throws Exception {
@@ -64,120 +64,79 @@
_log.debug("Aries Typed Event Bus Starting");
}
- // TODO use Config Admin
-
- Map<String, Object> map = new HashMap<String, Object>();
-
- createEventBus(bundleContext, map);
+ eventBus = coalesce(
+ configuration("org.apache.aries.typedevent.bus"),
+ just(Hashtable::new)
+ )
+ .map(this::toConfigProps)
+ .flatMap(configuration -> createProgram(configuration))
+ .run(bundleContext);
if (_log.isDebugEnabled()) {
_log.debug("Aries Typed Event Bus Started");
}
}
- private void createEventBus(BundleContext bundleContext, Map<String, ?> configuration) throws Exception {
+ private OSGi<?> createProgram(Map<String, ?> configuration) {
- Dictionary<String, Object> serviceProps = toServiceProps(configuration);
+ Map<String, Object> serviceProps = toServiceProps(configuration);
- monitorImpl = new TypedEventMonitorImpl(configuration);
- busImpl = new TypedEventBusImpl(monitorImpl, configuration);
+ return just(configuration)
+ .map(TypedEventMonitorImpl::new)
+ .effects(x -> { }, TypedEventMonitorImpl::destroy)
+ .flatMap(
+ temi -> register(TypedEventMonitor.class, temi, serviceProps)
+ .then(just(new TypedEventBusImpl(temi, configuration))
+ .effects(TypedEventBusImpl::start, TypedEventBusImpl::stop)))
+ .flatMap(
+ tebi -> all(
+ serviceReferences(TypedEventHandler.class,
+ csr -> {
+ tebi.updatedTypedEventHandler(
+ getServiceProps(csr.getServiceReference()));
+ return false;
+ })
+ .flatMap(csr -> service(csr)
+ .effects(
+ handler -> tebi.addTypedEventHandler(handler,
+ getServiceProps(csr.getServiceReference())),
+ handler -> tebi.removeTypedEventHandler(handler,
+ getServiceProps(csr.getServiceReference())))),
+ serviceReferences(UntypedEventHandler.class,
+ csr -> {
+ tebi.updatedTypedEventHandler(
+ getServiceProps(csr.getServiceReference()));
+ return false;
+ })
+ .flatMap(csr -> service(csr)
+ .effects(
+ handler -> tebi.addUntypedEventHandler(handler,
+ getServiceProps(csr.getServiceReference())),
+ handler -> tebi.removeUntypedEventHandler(handler,
+ getServiceProps(csr.getServiceReference())))),
+ serviceReferences(UnhandledEventHandler.class)
+ .flatMap(csr -> service(csr)
+ .effects(handler -> tebi.addUnhandledEventHandler(handler,
+ getServiceProps(csr.getServiceReference())),
+ handler -> tebi.removeUnhandledEventHandler(handler,
+ getServiceProps(csr.getServiceReference())))),
+ register(TypedEventBus.class, tebi, serviceProps)
+ .flatMap(x -> nothing())));
- untypedTracker = new ServiceTracker<>(bundleContext, UntypedEventHandler.class,
- new ServiceTrackerCustomizer<UntypedEventHandler, UntypedEventHandler>() {
-
- @Override
- public UntypedEventHandler addingService(ServiceReference<UntypedEventHandler> reference) {
- UntypedEventHandler service = bundleContext.getService(reference);
- busImpl.addUntypedEventHandler(service, getServiceProps(reference));
- return service;
- }
-
- @Override
- public void modifiedService(ServiceReference<UntypedEventHandler> reference,
- UntypedEventHandler service) {
- busImpl.updatedUntypedEventHandler(service, getServiceProps(reference));
- }
-
- @Override
- public void removedService(ServiceReference<UntypedEventHandler> reference,
- UntypedEventHandler service) {
- busImpl.removeUntypedEventHandler(service, getServiceProps(reference));
- }
- });
-
- untypedTracker = new ServiceTracker<>(bundleContext, UntypedEventHandler.class,
- new ServiceTrackerCustomizer<UntypedEventHandler, UntypedEventHandler>() {
-
- @Override
- public UntypedEventHandler addingService(ServiceReference<UntypedEventHandler> reference) {
- UntypedEventHandler service = bundleContext.getService(reference);
- busImpl.addUntypedEventHandler(service, getServiceProps(reference));
- return service;
- }
-
- @Override
- public void modifiedService(ServiceReference<UntypedEventHandler> reference,
- UntypedEventHandler service) {
- busImpl.updatedUntypedEventHandler(service, getServiceProps(reference));
- }
-
- @Override
- public void removedService(ServiceReference<UntypedEventHandler> reference,
- UntypedEventHandler service) {
- busImpl.removeUntypedEventHandler(service, getServiceProps(reference));
- }
- });
-
- unhandledTracker = new ServiceTracker<>(bundleContext, UnhandledEventHandler.class,
- new ServiceTrackerCustomizer<UnhandledEventHandler, UnhandledEventHandler>() {
-
- @Override
- public UnhandledEventHandler addingService(ServiceReference<UnhandledEventHandler> reference) {
- UnhandledEventHandler service = bundleContext.getService(reference);
- busImpl.addUnhandledEventHandler(service, getServiceProps(reference));
- return service;
- }
-
- @Override
- public void modifiedService(ServiceReference<UnhandledEventHandler> reference,
- UnhandledEventHandler service) {
- }
-
- @Override
- public void removedService(ServiceReference<UnhandledEventHandler> reference,
- UnhandledEventHandler service) {
- busImpl.removeUnhandledEventHandler(service, getServiceProps(reference));
- }
- });
-
- try {
- busImpl.start();
-
- monitorReg = bundleContext.registerService(TypedEventMonitor.class, monitorImpl, serviceProps);
-
- typedTracker.open();
- untypedTracker.open();
- unhandledTracker.open();
-
- busReg = bundleContext.registerService(TypedEventBus.class, busImpl, serviceProps);
-
- } catch (Exception e) {
- stop(bundleContext);
- }
}
- private void safeUnregister(ServiceRegistration<?> reg) {
- try {
- reg.unregister();
- } catch (IllegalStateException ise) {
- // no op
- // TODO LOG this
+ private Map<String, Object> toConfigProps(Dictionary<String, ?> config) {
+ Enumeration<String> keys = config.keys();
+ Map<String, Object> map = new HashMap<>();
+ while(keys.hasMoreElements()) {
+ String key = keys.nextElement();
+ map.put(key, config.get(key));
}
-
+ return map;
}
-
- private Dictionary<String, Object> toServiceProps(Map<String, ?> config) {
+
+ private Map<String, Object> toServiceProps(Map<String, ?> config) {
return config.entrySet().stream().filter(e -> e.getKey() != null && e.getKey().startsWith("."))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue, (a, b) -> {
throw new IllegalArgumentException("Duplicate key ");
@@ -194,30 +153,7 @@
_log.debug("Aries Typed Event Bus Stopping");
}
- // Order matters here
- if (busReg != null) {
- safeUnregister(busReg);
- }
-
- if (busImpl != null) {
- busImpl.stop();
- }
-
- if (typedTracker != null) {
- typedTracker.close();
- }
- if (untypedTracker != null) {
- untypedTracker.close();
- }
- if (unhandledTracker != null) {
- unhandledTracker.close();
- }
-
- if (monitorReg != null) {
- safeUnregister(monitorReg);
- }
-
- monitorImpl.destroy();
+ eventBus.close();
if (_log.isDebugEnabled()) {
_log.debug("Aries Typed Event Bus Stopped");
diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index b23df64..5be83f9 100644
--- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -32,7 +32,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
import java.util.stream.Stream;
import org.osgi.framework.Constants;
@@ -84,6 +83,18 @@
*/
private final Map<Long, List<String>> knownHandlers = new HashMap<>();
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}. Values from
+ * the map should be copied as the contents are not thread safe.
+ */
+ private final Map<Long, TypedEventHandler<?>> knownTypedHandlers = new HashMap<>();
+
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}. Values from
+ * the map should be copied as the contents are not thread safe.
+ */
+ private final Map<Long, UntypedEventHandler> knownUntypedHandlers = new HashMap<>();
+
private final BlockingQueue<EventTask> queue = new LinkedBlockingQueue<>();
/**
@@ -99,47 +110,84 @@
}
void addTypedEventHandler(TypedEventHandler<?> handler, Map<String, Object> properties) {
- // TODO try to extract topic name reflectively
- String defaultTopic = null;
+ Class<?> clazz = discoverTypeForTypedHandler(handler, properties);
+
+ String defaultTopic = clazz == null ? null : clazz.getName().replace(".", "/");
+ doAddEventHandler(topicsToTypedHandlers, knownTypedHandlers, handler, defaultTopic, properties);
+ }
+
+ private Class<?> discoverTypeForTypedHandler(TypedEventHandler<?> handler, Map<String, Object> properties) {
+ Class<?> clazz = null;
Object type = properties.get(TypedEventConstants.TYPED_EVENT_TYPE);
if (type != null) {
- defaultTopic = String.valueOf(type).replace(".", "/");
try {
- Class<?> clazz = handler.getClass().getClassLoader().loadClass(String.valueOf(type));
-
- synchronized (lock) {
- typedHandlersToTargetClasses.put(handler, clazz);
- }
+ clazz = handler.getClass().getClassLoader().loadClass(String.valueOf(type));
} catch (ClassNotFoundException e) {
// TODO Blow up
e.printStackTrace();
}
} else {
- Class<?> clazz = Arrays.stream(handler.getClass().getGenericInterfaces())
- .filter(ParameterizedType.class::isInstance).map(ParameterizedType.class::cast)
- .filter(t -> TypedEventHandler.class.equals(t.getRawType())).map(t -> t.getActualTypeArguments()[0])
- .findFirst().map(Class.class::cast).orElse(null);
-
- if (clazz != null) {
- defaultTopic = String.valueOf(type).replace(".", "/");
- synchronized (lock) {
- typedHandlersToTargetClasses.put(handler, clazz);
+ Class<?> toCheck = handler.getClass();
+ outer: while(clazz == null) {
+ clazz = findDirectlyImplemented(toCheck);
+
+ if(clazz != null) {
+ break outer;
}
- } else {
- // TODO Blow Up
+
+ clazz = processInterfaceHierarchyForClass(toCheck);
+
+ if(clazz != null) {
+ break outer;
+ }
+
+ toCheck = toCheck.getSuperclass();
}
}
- doAddEventHandler(topicsToTypedHandlers, handler, defaultTopic, properties);
+ if (clazz != null) {
+ synchronized (lock) {
+ typedHandlersToTargetClasses.put(handler, clazz);
+ }
+ } else {
+ // TODO Blow Up
+ }
+ return clazz;
+ }
+
+ private Class<?> processInterfaceHierarchyForClass(Class<?> toCheck) {
+ Class<?> clazz = null;
+ for (Class<?> iface : toCheck.getInterfaces()) {
+ clazz = findDirectlyImplemented(iface);
+
+ if(clazz != null) {
+ break;
+ }
+
+ clazz = processInterfaceHierarchyForClass(iface);
+
+ if(clazz != null) {
+ break;
+ }
+ }
+ return clazz;
+ }
+
+ private Class<?> findDirectlyImplemented(Class<?> toCheck) {
+ return Arrays.stream(toCheck.getGenericInterfaces())
+ .filter(ParameterizedType.class::isInstance)
+ .map(ParameterizedType.class::cast)
+ .filter(t -> TypedEventHandler.class.equals(t.getRawType())).map(t -> t.getActualTypeArguments()[0])
+ .findFirst().map(Class.class::cast).orElse(null);
}
void addUntypedEventHandler(UntypedEventHandler handler, Map<String, Object> properties) {
- doAddEventHandler(topicsToUntypedHandlers, handler, null, properties);
+ doAddEventHandler(topicsToUntypedHandlers, knownUntypedHandlers, handler, null, properties);
}
- private <T> void doAddEventHandler(Map<String, Map<T, Filter>> map, T handler, String defaultTopic,
- Map<String, Object> properties) {
+ private <T> void doAddEventHandler(Map<String, Map<T, Filter>> map, Map<Long, T> idMap,
+ T handler, String defaultTopic, Map<String, Object> properties) {
Object prop = properties.get(TypedEventConstants.TYPED_EVENT_TOPICS);
@@ -166,17 +214,13 @@
return;
}
- doAddToMap(map, handler, x -> f, topicList, serviceId);
- }
-
- private <T, U> void doAddToMap(Map<String, Map<T, U>> map, T handler, Function<String, U> valueSupplier,
- List<String> list, Long serviceId) {
synchronized (lock) {
- knownHandlers.put(serviceId, list);
-
- list.forEach(s -> {
- Map<T, U> handlers = map.computeIfAbsent(s, x -> new HashMap<>());
- handlers.put(handler, valueSupplier.apply(s));
+ knownHandlers.put(serviceId, topicList);
+ idMap.put(serviceId, handler);
+
+ topicList.forEach(s -> {
+ Map<T, Filter> handlers = map.computeIfAbsent(s, x1 -> new HashMap<>());
+ handlers.put(handler, f);
});
}
}
@@ -185,7 +229,7 @@
Long serviceId = getServiceId(properties);
- doRemoveEventHandler(topicsToTypedHandlers, handler, serviceId);
+ doRemoveEventHandler(topicsToTypedHandlers, knownTypedHandlers, handler, serviceId);
synchronized (lock) {
typedHandlersToTargetClasses.remove(handler);
@@ -196,7 +240,7 @@
Long serviceId = getServiceId(properties);
- doRemoveEventHandler(topicsToUntypedHandlers, handler, serviceId);
+ doRemoveEventHandler(topicsToUntypedHandlers, knownUntypedHandlers, handler, serviceId);
}
private Long getServiceId(Map<String, Object> properties) {
@@ -221,9 +265,11 @@
}
}
- private <T, U> void doRemoveEventHandler(Map<String, Map<T, U>> map, T handler, Long serviceId) {
+ private <T, U> void doRemoveEventHandler(Map<String, Map<T, U>> map, Map<Long, T> idMap,
+ T handler, Long serviceId) {
synchronized (lock) {
List<String> consumed = knownHandlers.remove(serviceId);
+ knownHandlers.remove(serviceId);
if (consumed != null) {
consumed.forEach(s -> {
Map<T, ?> handlers = map.get(s);
@@ -238,23 +284,32 @@
}
}
- void updatedTypedEventHandler(TypedEventHandler<?> handler, Map<String, Object> properties) {
- // TODO try to extract topic name reflectively
- String defaultTopic = null;
- doUpdatedEventHandler(topicsToTypedHandlers, handler, defaultTopic, properties);
+ void updatedTypedEventHandler(Map<String, Object> properties) {
+ Long serviceId = getServiceId(properties);
+ TypedEventHandler<?> handler;
+ synchronized (lock) {
+ handler = knownTypedHandlers.get(serviceId);
+ }
+
+ Class<?> clazz = discoverTypeForTypedHandler(handler, properties);
+
+ String defaultTopic = clazz == null ? null : clazz.getName().replace(".", "/");
+
+ doUpdatedEventHandler(topicsToTypedHandlers, knownTypedHandlers, defaultTopic, properties);
}
- void updatedUntypedEventHandler(UntypedEventHandler handler, Map<String, Object> properties) {
- doUpdatedEventHandler(topicsToUntypedHandlers, handler, null, properties);
+ void updatedUntypedEventHandler(Map<String, Object> properties) {
+ doUpdatedEventHandler(topicsToUntypedHandlers, knownUntypedHandlers, null, properties);
}
- private <T> void doUpdatedEventHandler(Map<String, Map<T, Filter>> map, T handler, String defaultTopic,
+ private <T> void doUpdatedEventHandler(Map<String, Map<T, Filter>> map, Map<Long,T> idToHandler, String defaultTopic,
Map<String, Object> properties) {
Long serviceId = getServiceId(properties);
synchronized (lock) {
- doRemoveEventHandler(map, handler, serviceId);
- doAddEventHandler(map, handler, defaultTopic, properties);
+ T handler = idToHandler.get(serviceId);
+ doRemoveEventHandler(map, idToHandler, handler, serviceId);
+ doAddEventHandler(map, idToHandler, handler, defaultTopic, properties);
}
}
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent.java
new file mode 100644
index 0000000..ab2fee1
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.common;
+
+public class TestEvent {
+ public String message;
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2.java
new file mode 100644
index 0000000..e91c832
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.common;
+
+public class TestEvent2 {
+ public TestEvent subEvent;
+ public EventType eventType;
+
+ public static TestEvent2 create(TestEvent event) {
+ TestEvent2 event2 = new TestEvent2();
+ event2.subEvent = event;
+ event2.eventType = EventType.RED;
+ return event2;
+ }
+
+
+ public static enum EventType {
+ RED, GREEN, BLUE;
+ }
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java
new file mode 100644
index 0000000..cdf8d08
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEvent2Consumer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.common;
+
+import org.osgi.service.typedevent.TypedEventHandler;
+
+public interface TestEvent2Consumer extends TypedEventHandler<TestEvent2> {
+
+}
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEventConsumer.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEventConsumer.java
new file mode 100644
index 0000000..2f78de7
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/common/TestEventConsumer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.common;
+
+import org.osgi.service.typedevent.TypedEventHandler;
+
+public interface TestEventConsumer extends TypedEventHandler<TestEvent> {
+
+}
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
index 6b8c67f..7b3996b 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
@@ -19,7 +19,8 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.quality.Strictness.LENIENT;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
import java.util.HashMap;
import java.util.Map;
@@ -29,12 +30,10 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.MockitoAnnotations;
import org.osgi.framework.Constants;
import org.osgi.service.typedevent.TypedEventConstants;
import org.osgi.service.typedevent.TypedEventHandler;
@@ -42,10 +41,12 @@
import org.osgi.service.typedevent.UntypedEventHandler;
import org.osgi.util.converter.Converters;
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = LENIENT)
public class TypedEventBusImplTest {
+ private static final String SPECIAL_TEST_EVENT_TOPIC = SpecialTestEvent.class.getName().replace(".", "/");
+
+ private static final String TEST_EVENT_TOPIC = TestEvent.class.getName().replace(".", "/");
+
public static class TestEvent {
public String message;
}
@@ -53,14 +54,18 @@
public static class TestEvent2 {
public int count;
}
+
+ public static class SpecialTestEvent extends TestEvent {
+
+ }
- @Mock
+ @Mock(lenient = true)
TypedEventHandler<Object> handlerA, handlerB;
- @Mock
+ @Mock(lenient = true)
UntypedEventHandler untypedHandlerA, untypedHandlerB;
- @Mock
+ @Mock(lenient = true)
UnhandledEventHandler unhandledHandler;
Semaphore semA = new Semaphore(0), semB = new Semaphore(0), untypedSemA = new Semaphore(0),
@@ -69,9 +74,13 @@
TypedEventBusImpl impl;
TypedEventMonitorImpl monitorImpl;
+ private AutoCloseable mocks;
+
@BeforeEach
public void start() {
+ mocks = MockitoAnnotations.openMocks(this);
+
Mockito.doAnswer(i -> {
semA.release();
return null;
@@ -104,9 +113,10 @@
}
@AfterEach
- public void stop() {
+ public void stop() throws Exception {
impl.stop();
monitorImpl.destroy();
+ mocks.close();
}
/**
@@ -122,7 +132,7 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put(Constants.SERVICE_ID, 42L);
@@ -138,7 +148,7 @@
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(Constants.SERVICE_ID, 44L);
impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
@@ -167,6 +177,73 @@
assertFalse(untypedSemB.tryAcquire(1, TimeUnit.SECONDS));
}
+
+ public static class TestEventHandler implements TypedEventHandler<TestEvent> {
+
+ @Override
+ public void notify(String topic, TestEvent event) {
+ // No op
+ }
+ }
+
+ public static interface TestEventHandlerIface extends TypedEventHandler<TestEvent> {
+
+ }
+
+ /**
+ * Tests that reified typedEventHandlers are properly processed
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testGenericTypeInference() throws InterruptedException {
+
+ TypedEventHandler<TestEvent> handler = Mockito.spy(TestEventHandler.class);
+ TypedEventHandler<TestEvent> handler2 = Mockito.spy(TestEventHandler.class);
+ TypedEventHandler<TestEvent> handler3 = Mockito.mock(TestEventHandlerIface.class);
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ Map<String, Object> serviceProperties = new HashMap<>();
+ serviceProperties.put(Constants.SERVICE_ID, 42L);
+
+ impl.addTypedEventHandler(handler, serviceProperties);
+
+ serviceProperties = new HashMap<>();
+
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, SpecialTestEvent.class.getName());
+ serviceProperties.put(Constants.SERVICE_ID, 43L);
+
+ impl.addTypedEventHandler(handler2, serviceProperties);
+
+ serviceProperties = new HashMap<>();
+
+ serviceProperties.put(Constants.SERVICE_ID, 44L);
+
+ impl.addTypedEventHandler(handler3, serviceProperties);
+
+ impl.deliver(event);
+
+ Mockito.verify(handler, Mockito.timeout(1000)).notify(eq(TEST_EVENT_TOPIC), argThat(isTestEventWithMessage("boo")));
+ Mockito.verify(handler3, Mockito.timeout(1000)).notify(eq(TEST_EVENT_TOPIC), argThat(isTestEventWithMessage("boo")));
+
+ Mockito.verify(handler2, Mockito.after(1000).never()).notify(Mockito.anyString(), Mockito.any());
+
+
+ event = new SpecialTestEvent();
+ event.message = "far";
+ impl.deliver(event);
+
+ Mockito.verify(handler, Mockito.after(1000).never()).notify(eq(SPECIAL_TEST_EVENT_TOPIC), Mockito.any());
+ Mockito.verify(handler3, Mockito.after(1000).never()).notify(eq(SPECIAL_TEST_EVENT_TOPIC), Mockito.any());
+
+ Mockito.verify(handler2, Mockito.timeout(1000)).notify(eq(SPECIAL_TEST_EVENT_TOPIC),
+ argThat(isSpecialTestEventWithMessage("far")));
+
+
+
+ }
/**
* Tests that events are delivered to Smart Behaviours based on type
@@ -237,7 +314,7 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put("event.filter", "(message=foo)");
serviceProperties.put(Constants.SERVICE_ID, 42L);
@@ -246,7 +323,7 @@
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put("event.filter", "(message=bar)");
serviceProperties.put(Constants.SERVICE_ID, 43L);
@@ -255,7 +332,7 @@
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put("event.filter", "(message=foo)");
serviceProperties.put(Constants.SERVICE_ID, 44L);
@@ -263,7 +340,7 @@
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put("event.filter", "(message=bar)");
serviceProperties.put(Constants.SERVICE_ID, 45L);
@@ -318,7 +395,7 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put("event.filter", "");
serviceProperties.put(Constants.SERVICE_ID, 42L);
@@ -343,7 +420,7 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName().replace(".", "/"));
+ serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
serviceProperties.put("event.filter", "(message=foo)");
serviceProperties.put(Constants.SERVICE_ID, 42L);
@@ -382,15 +459,25 @@
}
- ArgumentMatcher<Object> isTestEventWithMessage(String message) {
- return new ArgumentMatcher<Object>() {
+ ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) {
+ return new ArgumentMatcher<TestEvent>() {
@Override
- public boolean matches(Object argument) {
+ public boolean matches(TestEvent argument) {
return argument instanceof TestEvent && message.equals(((TestEvent) argument).message);
}
};
}
+
+ ArgumentMatcher<SpecialTestEvent> isSpecialTestEventWithMessage(String message) {
+ return new ArgumentMatcher<SpecialTestEvent>() {
+
+ @Override
+ public boolean matches(SpecialTestEvent argument) {
+ return argument instanceof SpecialTestEvent && message.equals(((SpecialTestEvent) argument).message);
+ }
+ };
+ }
ArgumentMatcher<Map<String, Object>> isUntypedTestEventWithMessage(String message) {
return new ArgumentMatcher<Map<String, Object>>() {
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/AbstractIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/AbstractIntegrationTest.java
new file mode 100644
index 0000000..81cd6d2
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/AbstractIntegrationTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEvent2;
+import org.junit.jupiter.api.AfterEach;
+import org.mockito.ArgumentMatcher;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ *
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+public abstract class AbstractIntegrationTest {
+
+ protected static final String TEST_EVENT_TOPIC = TestEvent.class.getName().replace(".", "/");
+
+ protected static final String TEST_EVENT_2_TOPIC = TestEvent2.class.getName().replace(".", "/");
+
+ protected final List<ServiceRegistration<?>> regs = new ArrayList<ServiceRegistration<?>>();
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ regs.forEach(sr -> {
+ try {
+ sr.unregister();
+ } catch (Exception e) { }
+ });
+ }
+
+ protected ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) {
+ return new ArgumentMatcher<TestEvent>() {
+
+ @Override
+ public boolean matches(TestEvent argument) {
+ return message.equals(argument.message);
+ }
+ };
+ }
+
+ protected ArgumentMatcher<TestEvent2> isTestEvent2WithMessage(String message) {
+ return new ArgumentMatcher<TestEvent2>() {
+
+ @Override
+ public boolean matches(TestEvent2 argument) {
+ return message.equals(argument.subEvent.message);
+ }
+ };
+ }
+
+ protected ArgumentMatcher<Map<String, Object>> isUntypedTestEventWithMessage(String message) {
+ return new ArgumentMatcher<Map<String, Object>>() {
+
+ @Override
+ public boolean matches(Map<String, Object> argument) {
+ return argument != null && message.equals(argument.get("message"));
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java
new file mode 100644
index 0000000..40543fa
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/EventDeliveryIntegrationTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.eq;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEvent2;
+import org.apache.aries.typedevent.bus.common.TestEvent2.EventType;
+import org.apache.aries.typedevent.bus.common.TestEvent2Consumer;
+import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventConstants;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ *
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class EventDeliveryIntegrationTest extends AbstractIntegrationTest {
+
+ @InjectBundleContext
+ BundleContext context;
+
+ @InjectService
+ TypedEventBus eventBus;
+
+ @Mock
+ TestEventConsumer typedEventHandler;
+
+ @Mock
+ TestEvent2Consumer typedEventHandler2;
+
+ @Mock
+ UntypedEventHandler untypedEventHandler, untypedEventHandler2;
+
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ public void setupMocks() {
+ mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ public void stop() throws Exception {
+ mocks.close();
+ }
+
+ /**
+ * Tests that events are delivered to untyped Event Handlers
+ * based on topic
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testEventReceiving() throws InterruptedException {
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ Dictionary<String, Object> props = new Hashtable<>();
+
+ regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+
+ regs.add(context.registerService(TypedEventHandler.class, typedEventHandler2, props));
+
+ eventBus.deliver(event);
+
+ Mockito.verify(typedEventHandler, Mockito.timeout(1000)).notify(
+ Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("boo")));
+
+ Mockito.verify(typedEventHandler2, Mockito.after(1000).never()).notify(
+ Mockito.eq(TEST_EVENT_TOPIC), Mockito.any());
+ }
+
+ /**
+ * Tests that events are delivered to untyped Event Handlers
+ * based on topic
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testEventReceivingUntyped() throws InterruptedException {
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+
+ regs.add(context.registerService(UntypedEventHandler.class, untypedEventHandler, props));
+
+ props = new Hashtable<>();
+
+ props.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_2_TOPIC);
+
+ regs.add(context.registerService(UntypedEventHandler.class, untypedEventHandler2, props));
+
+
+ eventBus.deliver(event);
+
+ Mockito.verify(untypedEventHandler, Mockito.timeout(1000)).notifyUntyped(
+ Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+
+ Mockito.verify(untypedEventHandler2, Mockito.after(1000).never()).notifyUntyped(
+ Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isUntypedTestEventWithMessage("boo")));
+
+ }
+
+ @Test
+ public void testSendComplexEvent() throws Exception {
+ Dictionary<String, Object> props = new Hashtable<>();
+
+ regs.add(context.registerService(TypedEventHandler.class, typedEventHandler2, props));
+
+ TestEvent event = new TestEvent();
+ event.message = "foo";
+
+ TestEvent2 event2 = TestEvent2.create(event);
+
+ eventBus.deliver(event2);
+
+
+ Mockito.verify(typedEventHandler2, Mockito.timeout(1000))
+ .notify(Mockito.eq(TEST_EVENT_2_TOPIC), Mockito.argThat(isTestEvent2WithMessage("foo")));
+ }
+
+ @Test
+ public void testSendComplexEventToUntypedReceiver() throws Exception {
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_2_TOPIC);
+
+ regs.add(context.registerService(UntypedEventHandler.class,
+ untypedEventHandler, props));
+
+ TestEvent event = new TestEvent();
+ event.message = "foo";
+
+ TestEvent2 event2 = TestEvent2.create(event);
+
+ eventBus.deliver(event2);
+
+ @SuppressWarnings("unchecked")
+ ArgumentCaptor<Map<String, Object>> captor = ArgumentCaptor.forClass(Map.class);
+
+ Mockito.verify(untypedEventHandler, Mockito.timeout(1000))
+ .notifyUntyped(eq(TEST_EVENT_2_TOPIC), captor.capture());
+
+ Map<String, Object> map = captor.getValue();
+
+ // Should be a String not an enum as we can't see the types
+ assertEquals("RED", map.get("eventType"));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> subMap = (Map<String, Object>) map.get("subEvent");
+
+ assertEquals("foo", subMap.get("message"));
+ }
+
+ @Test
+ public void testSendComplexUntypedEventToTypedReceiver() throws Exception {
+ Dictionary<String, Object> props = new Hashtable<>();
+
+ regs.add(context.registerService(TypedEventHandler.class,
+ typedEventHandler2, props));
+
+ Map<String, Object> event = new HashMap<>();
+ event.put("message", "foo");
+
+ Map<String, Object> event2 = new HashMap<>();
+ event2.put("subEvent", event);
+ event2.put("eventType", "BLUE");
+
+ eventBus.deliver(TEST_EVENT_2_TOPIC, event2);
+
+ ArgumentCaptor<TestEvent2> captor = ArgumentCaptor.forClass(TestEvent2.class);
+
+ Mockito.verify(typedEventHandler2, Mockito.timeout(1000))
+ .notify(eq(TEST_EVENT_2_TOPIC), captor.capture());
+
+ TestEvent2 received = captor.getValue();
+
+ // Should be a String not an enum as we can't see the types
+ assertEquals(EventType.BLUE, received.eventType);
+
+ assertEquals("foo", received.subEvent.message);
+ }
+
+
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
new file mode 100644
index 0000000..d4dcc78
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or eventBusied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ *
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class FilterIntegrationTest extends AbstractIntegrationTest {
+
+ @InjectBundleContext
+ BundleContext context;
+
+ @InjectService
+ TypedEventBus eventBus;
+
+ @Mock
+ TestEventConsumer typedEventHandler, typedEventHandlerB;
+
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ public void setupMocks() {
+ mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ public void stop() throws Exception {
+ mocks.close();
+ }
+
+ @Test
+ public void testFilteredListener() throws Exception {
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put("event.filter", "(message=foo)");
+
+ regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+
+ props = new Hashtable<>();
+ props.put("event.filter", "(message=bar)");
+
+ regs.add(context.registerService(TypedEventHandler.class, typedEventHandlerB, props));
+
+ TestEvent event = new TestEvent();
+ event.message = "foo";
+
+ eventBus.deliver(event);
+
+ Mockito.verify(typedEventHandler, Mockito.timeout(1000))
+ .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("foo")));
+
+ Mockito.verify(typedEventHandlerB, Mockito.after(1000).never())
+ .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("foo")));
+
+
+ event = new TestEvent();
+ event.message = "bar";
+
+ eventBus.deliver(event);
+
+ Mockito.verify(typedEventHandlerB, Mockito.timeout(1000))
+ .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("bar")));
+
+ Mockito.verify(typedEventHandler, Mockito.after(1000).never())
+ .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("bar")));
+ }
+
+ @Test
+ public void testFilteredListenerEmptyString() throws Exception {
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put("event.filter", "");
+
+
+ regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+
+ TestEvent event = new TestEvent();
+ event.message = "foo";
+
+ eventBus.deliver(event);
+
+ Mockito.verify(typedEventHandler, Mockito.timeout(1000))
+ .notify(Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("foo")));
+ }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
new file mode 100644
index 0000000..3c6d5c8
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/TypedEventMonitorIntegrationTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or eventBusied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.lang.reflect.InvocationTargetException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.monitor.MonitorEvent;
+import org.osgi.service.typedevent.monitor.TypedEventMonitor;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+import org.osgi.util.promise.Promise;
+
+
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class TypedEventMonitorIntegrationTest extends AbstractIntegrationTest {
+
+ TypedEventMonitor monitor;
+
+ TypedEventBus eventBus;
+
+ @Mock
+ TestEventConsumer typedEventHandler;
+
+ private AutoCloseable mocks;
+
+ private static Bundle eventBusBundle;
+
+ @BeforeAll
+ public static void clearInitialHistory(@InjectBundleContext BundleContext ctx) throws Exception {
+ eventBusBundle = getBusBundle(ctx);
+
+ eventBusBundle.stop();
+ eventBusBundle.start();
+ }
+
+ private static Bundle getBusBundle(BundleContext ctx) {
+ return Arrays.stream(ctx.getBundles())
+ .filter(b -> "org.apache.aries.typedevent.bus".equals(b.getSymbolicName()))
+ .findAny().orElse(null);
+ }
+
+ @BeforeEach
+ public void setupMocks() {
+ mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ /**
+ * Inject services every time as we restart the eventBus after each test
+ * @param monitor
+ * @param bus
+ */
+ @BeforeEach
+ public void setupMocks(@InjectService TypedEventMonitor monitor, @InjectService TypedEventBus bus) {
+ this.monitor = monitor;
+ this.eventBus = bus;
+ }
+
+ @AfterEach
+ public void stop() throws Exception {
+ mocks.close();
+
+ // Needed to clear history from previous tests
+ eventBusBundle.stop();
+ eventBusBundle.start();
+ }
+
+ /**
+ * Tests that events are delivered to the monitor
+ *
+ * @throws InterruptedException
+ * @throws InvocationTargetException
+ */
+ @Test
+ public void testTypedEventMonitor1() throws InterruptedException, InvocationTargetException {
+
+ Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents()
+ .limit(2)
+ .collect(Collectors.toList());
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ Dictionary<String, Object> props = new Hashtable<>();
+
+ regs.add(eventBusBundle.getBundleContext().registerService(TypedEventHandler.class, typedEventHandler, props));
+
+ eventBus.deliver(event);
+
+ event = new TestEvent();
+ event.message = "bam";
+
+ eventBus.deliver(event);
+
+
+ Mockito.verify(typedEventHandler, Mockito.timeout(2000)).notify(
+ Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("boo")));
+ Mockito.verify(typedEventHandler, Mockito.timeout(2000)).notify(
+ Mockito.eq(TEST_EVENT_TOPIC), Mockito.argThat(isTestEventWithMessage("bam")));
+
+ List<MonitorEvent> events = eventsPromise.timeout(100).getValue();
+
+ assertEquals(2, events.size());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+ assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+ assertEquals("boo", events.get(0).eventData.get("message"));
+ assertEquals("bam", events.get(1).eventData.get("message"));
+
+
+ }
+
+
+ /**
+ * Tests that events are delivered to the monitor even when nobody is listening
+ *
+ * @throws InterruptedException
+ * @throws InvocationTargetException
+ */
+ @Test
+ public void testTypedEventMonitor2() throws InterruptedException, InvocationTargetException {
+
+ Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents()
+ .limit(2)
+ .collect(Collectors.toList());
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ eventBus.deliver(event);
+
+ event = new TestEvent();
+ event.message = "bam";
+
+ eventBus.deliver(event);
+
+ List<MonitorEvent> events = eventsPromise.timeout(2000).getValue();
+
+ assertEquals(2, events.size());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+ assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+ assertEquals("boo", events.get(0).eventData.get("message"));
+ assertEquals("bam", events.get(1).eventData.get("message"));
+
+
+ }
+
+ /**
+ * Tests that event history is delivered to the monitor
+ *
+ * @throws InterruptedException
+ * @throws InvocationTargetException
+ */
+ @Test
+ public void testTypedEventMonitorHistory1() throws InterruptedException, InvocationTargetException {
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ eventBus.deliver(event);
+
+ event = new TestEvent();
+ event.message = "bam";
+
+ eventBus.deliver(event);
+
+ Thread.sleep(500);
+
+ Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents()
+ .limit(Duration.ofSeconds(1))
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ List<MonitorEvent> events = eventsPromise.getValue();
+
+ assertTrue(events.isEmpty());
+
+ eventsPromise = monitor.monitorEvents(5)
+ .limit(Duration.ofSeconds(1))
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ events = eventsPromise.getValue();
+
+ assertEquals(2, events.size(), events.toString());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+ assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+ assertEquals("boo", events.get(0).eventData.get("message"));
+ assertEquals("bam", events.get(1).eventData.get("message"));
+
+ eventsPromise = monitor.monitorEvents(1)
+ .limit(Duration.ofSeconds(1))
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ events = eventsPromise.getValue();
+
+ assertEquals(1, events.size());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+
+ assertEquals("bam", events.get(0).eventData.get("message"));
+
+
+ }
+
+ /**
+ * Tests that event history is delivered to the monitor
+ *
+ * @throws InterruptedException
+ * @throws InvocationTargetException
+ */
+ @Test
+ public void testTypedEventMonitorHistory2() throws InterruptedException, InvocationTargetException {
+
+ Instant beforeFirst = Instant.now().minus(Duration.ofMillis(500));
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ eventBus.deliver(event);
+
+ Instant afterFirst = Instant.now().plus(Duration.ofMillis(500));
+
+ Thread.sleep(1000);
+
+ event = new TestEvent();
+ event.message = "bam";
+
+ eventBus.deliver(event);
+
+ Instant afterSecond = Instant.now().plus(Duration.ofMillis(500));
+
+ Thread.sleep(500);
+
+ Promise<List<MonitorEvent>> eventsPromise = monitor.monitorEvents()
+ .limit(Duration.ofSeconds(1))
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+
+ List<MonitorEvent> events = eventsPromise.getValue();
+
+ assertTrue(events.isEmpty());
+
+ eventsPromise = monitor.monitorEvents(beforeFirst)
+ .limit(Duration.ofSeconds(1))
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ events = eventsPromise.getValue();
+
+ assertEquals(2, events.size());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+ assertEquals(TEST_EVENT_TOPIC, events.get(1).topic);
+
+ assertEquals("boo", events.get(0).eventData.get("message"));
+ assertEquals("bam", events.get(1).eventData.get("message"));
+
+ eventsPromise = monitor.monitorEvents(afterFirst)
+ .limit(Duration.ofSeconds(1))
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ events = eventsPromise.getValue();
+
+ assertEquals(1, events.size());
+
+ assertEquals(TEST_EVENT_TOPIC, events.get(0).topic);
+
+ assertEquals("bam", events.get(0).eventData.get("message"));
+
+ eventsPromise = monitor.monitorEvents(afterSecond)
+ .limit(Duration.ofSeconds(1))
+ .collect(Collectors.toList())
+ .timeout(2000);
+
+ events = eventsPromise.getValue();
+
+ assertTrue(events.isEmpty());
+ }
+
+}
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
new file mode 100644
index 0000000..c781b62
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.bus.osgi;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.aries.typedevent.bus.common.TestEvent;
+import org.apache.aries.typedevent.bus.common.TestEventConsumer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.UnhandledEventHandler;
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ *
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class UnhandledEventHandlerIntegrationTest extends AbstractIntegrationTest {
+
+ @InjectBundleContext
+ BundleContext context;
+
+ @InjectService
+ TypedEventBus eventBus;
+
+ @Mock
+ TestEventConsumer typedEventHandler;
+
+ @Mock
+ UntypedEventHandler untypedEventHandler;
+
+ @Mock
+ UnhandledEventHandler unhandledEventHandler;
+
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ public void setupMocks() {
+ mocks = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ public void stop() throws Exception {
+ mocks.close();
+ }
+
+ /**
+ * Tests that the unhandledEventHandler gets called appropriately
+ * @throws InterruptedException
+ */
+ @Test
+ public void testUnhandledDueToTopic() throws InterruptedException {
+
+ Dictionary<String, Object> props = new Hashtable<>();
+
+ regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+
+ props = new Hashtable<>();
+
+ regs.add(context.registerService(UnhandledEventHandler.class, unhandledEventHandler, props));
+
+
+ TestEvent event = new TestEvent();
+ event.message = "foo";
+
+ eventBus.deliver(event);
+
+ verify(typedEventHandler, timeout(1000)).notify(eq(TEST_EVENT_TOPIC),
+ argThat(isTestEventWithMessage("foo")));
+
+ verify(unhandledEventHandler, after(1000).never()).notifyUnhandled(anyString(), anyMap());
+
+
+ eventBus.deliver("anotherTopic", event);
+
+ verify(typedEventHandler, after(1000).never()).notify(eq("anotherTopic"), any());
+
+ verify(unhandledEventHandler, timeout(1000)).notifyUnhandled(eq("anotherTopic"),
+ argThat(isUntypedTestEventWithMessage("foo")));
+
+ }
+
+ /**
+ * Tests that the consumer of last resort gets called appropriately
+ * @throws InterruptedException
+ */
+ @Test
+ public void testUnhandledDueToFilter() throws InterruptedException {
+
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put("event.filter", "(message=foo)");
+
+ regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
+
+ props = new Hashtable<>();
+
+ regs.add(context.registerService(UnhandledEventHandler.class, unhandledEventHandler, props));
+
+
+ TestEvent event = new TestEvent();
+ event.message = "foo";
+
+ eventBus.deliver(event);
+
+ verify(typedEventHandler, timeout(1000)).notify(eq(TEST_EVENT_TOPIC),
+ argThat(isTestEventWithMessage("foo")));
+
+ verify(unhandledEventHandler, after(1000).never()).notifyUnhandled(anyString(), anyMap());
+
+
+ event = new TestEvent();
+ event.message = "bar";
+
+
+ eventBus.deliver(event);
+
+ verify(typedEventHandler, after(1000).never()).notify(eq(TEST_EVENT_TOPIC),
+ argThat(isTestEventWithMessage("bar")));
+
+ verify(unhandledEventHandler, timeout(1000)).notifyUnhandled(eq(TEST_EVENT_TOPIC),
+ argThat(isUntypedTestEventWithMessage("bar")));
+
+ }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.bus/test.bndrun b/org.apache.aries.typedevent.bus/test.bndrun
new file mode 100644
index 0000000..6735898
--- /dev/null
+++ b/org.apache.aries.typedevent.bus/test.bndrun
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+-tester: biz.aQute.tester.junit-platform
+
+-runfw: org.apache.felix.framework
+
+-runrequires: bnd.identity;id="org.apache.aries.typedevent.bus-tests",\
+ bnd.identity;id="junit-jupiter-engine",\
+ bnd.identity;id="junit-platform-launcher"
+
+-runsystempackages: sun.reflect
+
+-resolve.effective: active
+-runbundles: \
+ ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
+ ch.qos.logback.core;version='[1.2.3,1.2.4)',\
+ org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
+ org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)',\
+ org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
+ org.apache.felix.converter;version='[1.0.14,1.0.15)',\
+ org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
+ org.osgi.util.function;version='[1.1.0,1.1.1)',\
+ org.osgi.util.promise;version='[1.1.1,1.1.2)',\
+ org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
+ slf4j.api;version='[1.7.30,1.7.31)',\
+ junit-jupiter-api;version='[5.6.2,5.6.3)',\
+ junit-platform-commons;version='[1.6.2,1.6.3)',\
+ net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
+ net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
+ org.apache.aries.typedevent.bus-tests;version='[0.0.1,0.0.2)',\
+ org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
+ org.objenesis;version='[3.1.0,3.1.1)',\
+ org.opentest4j;version='[1.2.0,1.2.1)',\
+ org.osgi.test.common;version='[0.9.0,0.9.1)',\
+ org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
+ junit-platform-engine;version='[1.6.2,1.6.3)',\
+ junit-platform-launcher;version='[1.6.2,1.6.3)',\
+ junit-jupiter-engine;version='[5.6.2,5.6.3)'
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 9ccbbbe..dfb1c98 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,7 +48,7 @@
</repositories>
<properties>
- <bnd.version>5.1.0</bnd.version>
+ <bnd.version>5.1.2</bnd.version>
<dsl.version>1.2.2</dsl.version>
<junit.version>5.6.2</junit.version>
<mockito.version>3.5.10</mockito.version>
@@ -113,25 +113,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.7.0</version>
- </dependency>
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>${mockito.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-junit-jupiter</artifactId>
- <version>${mockito.version}</version>
- <scope>test</scope>
+ <version>1.7.30</version>
</dependency>
</dependencies>
</dependencyManagement>
@@ -140,6 +122,26 @@
<pluginManagement>
<plugins>
<plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>3.2.0</version>
+ <executions>
+ <execution>
+ <id>regex-property</id>
+ <goals>
+ <goal>regex-property</goal>
+ </goals>
+ <configuration>
+ <name>path.from.artifactId</name>
+ <value>${project.artifactId}</value>
+ <regex>\.</regex>
+ <replacement>/</replacement>
+ <failIfNoMatch>false</failIfNoMatch>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
@@ -157,6 +159,19 @@
<manifestFile>${project.build.outputDirectory}/META-INF/MANIFEST.MF</manifestFile>
</archive>
</configuration>
+ <executions>
+ <execution>
+ <id>test-jar</id>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifestFile>${project.build.testOutputDirectory}/META-INF/MANIFEST.MF</manifestFile>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -172,6 +187,9 @@
<version>2.22.2</version>
<configuration>
<source>1.8</source>
+ <excludes>
+ <exclude>${path.from.artifactId}/osgi/**</exclude>
+ </excludes>
</configuration>
</plugin>
<plugin>
@@ -185,19 +203,124 @@
<goal>bnd-process</goal>
</goals>
</execution>
+ <execution>
+ <id>test-bnd-process</id>
+ <goals>
+ <goal>bnd-process-tests</goal>
+ </goals>
+ <configuration>
+ <artifactFragment>true</artifactFragment>
+ <testCases>useTestCasesHeader</testCases>
+ <bnd><![CDATA[
+ Test-Cases:${select;${classes;HIERARCHY_INDIRECTLY_ANNOTATED;org.junit.platform.commons.annotation.Testable;CONCRETE};${project.artifactId}.osgi.*}
+ ]]></bnd>
+ </configuration>
+ </execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-resolver-maven-plugin</artifactId>
+ <version>${bnd.version}</version>
+ <executions>
+ <execution>
+ <id>resolve-test</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>resolve</goal>
+ </goals>
+ <configuration>
+ <bndruns>
+ <bndrun>test.bndrun</bndrun>
+ </bndruns>
+ <bundles>
+ <bundle>${project.build.directory}/${project.build.finalName}-tests.jar</bundle>
+ </bundles>
+ <failOnChanges>false</failOnChanges>
+ <includeDependencyManagement>true</includeDependencyManagement>
+ <reportOptional>false</reportOptional>
+ <scopes>
+ <scope>compile</scope>
+ <scope>runtime</scope>
+ <scope>test</scope>
+ </scopes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>resolve-run</id>
+ <goals>
+ <goal>resolve</goal>
+ </goals>
+ <configuration>
+ <bndruns>
+ <bndrun>run.bndrun</bndrun>
+ </bndruns>
+ <failOnChanges>false</failOnChanges>
+ <includeDependencyManagement>true</includeDependencyManagement>
+ <reportOptional>false</reportOptional>
+ <scopes>
+ <scope>compile</scope>
+ <scope>runtime</scope>
+ <scope>test</scope>
+ </scopes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-testing-maven-plugin</artifactId>
+ <version>${bnd.version}</version>
+ <executions>
+ <execution>
+ <id>testing</id>
+ <goals>
+ <goal>testing</goal>
+ </goals>
+ <configuration>
+ <bndruns>
+ <bndrun>test.bndrun</bndrun>
+ </bndruns>
+ <bundles>
+ <bundle>${project.build.directory}/${project.build.finalName}-tests.jar</bundle>
+ </bundles>
+ <failOnChanges>false</failOnChanges>
+ <includeDependencyManagement>true</includeDependencyManagement>
+ <resolve>false</resolve>
+ <scopes>
+ <scope>compile</scope>
+ <scope>runtime</scope>
+ <scope>test</scope>
+ </scopes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-run-maven-plugin</artifactId>
+ <version>${bnd.version}</version>
+ <configuration>
+ <bndrun>run.bndrun</bndrun>
+ <includeDependencyManagement>true</includeDependencyManagement>
+ <scopes>
+ <scope>compile</scope>
+ <scope>runtime</scope>
+ <scope>test</scope>
+ </scopes>
+ </configuration>
+ </plugin>
</plugins>
</pluginManagement>
-
<plugins>
<plugin>
- <groupId>biz.aQute.bnd</groupId>
- <artifactId>bnd-maven-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<modules>
+ <module>typedevent-test-bom</module>
<module>org.apache.aries.typedevent.bus</module>
</modules>
</project>
diff --git a/typedevent-test-bom/pom.xml b/typedevent-test-bom/pom.xml
new file mode 100644
index 0000000..e3c703e
--- /dev/null
+++ b/typedevent-test-bom/pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ you under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.typedevent</groupId>
+ <artifactId>typedevent-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>typedevent-test-bom</artifactId>
+ <packaging>pom</packaging>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.junit</groupId>
+ <artifactId>junit-bom</artifactId>
+ <version>${junit.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.test.junit5</artifactId>
+ <version>0.9.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.framework</artifactId>
+ <version>6.0.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.cm</artifactId>
+ <version>1.6.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.configadmin</artifactId>
+ <version>1.9.18</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.2.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ <version>1.2.3</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <!-- The Web Console -->
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.webconsole</artifactId>
+ <version>4.3.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.webconsole.plugins.ds</artifactId>
+ <version>2.0.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.inventory</artifactId>
+ <version>1.0.4</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- The Gogo Shell -->
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.gogo.shell</artifactId>
+ <version>1.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.gogo.runtime</artifactId>
+ <version>1.0.10</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.gogo.command</artifactId>
+ <version>1.0.2</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+</project>
\ No newline at end of file