Initial support for remote events using OSGi Remote Services
diff --git a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
index 5be83f9..21a0828 100644
--- a/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
+++ b/org.apache.aries.typedevent.bus/src/main/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImpl.java
@@ -19,6 +19,11 @@
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
+import static org.osgi.namespace.implementation.ImplementationNamespace.IMPLEMENTATION_NAMESPACE;
+import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_IMPLEMENTATION;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_SPECIFICATION_VERSION;
import static org.osgi.util.converter.Converters.standardConverter;
import java.lang.reflect.ParameterizedType;
@@ -34,6 +39,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
+import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.Constants;
import org.osgi.framework.Filter;
import org.osgi.framework.FrameworkUtil;
@@ -45,6 +51,8 @@
import org.osgi.service.typedevent.UntypedEventHandler;
import org.osgi.util.converter.TypeReference;
+@Capability(namespace=SERVICE_NAMESPACE, attribute="objectClass:List<String>=org.osgi.service.typedevent.TypedEventBus", uses=TypedEventBus.class)
+@Capability(namespace=IMPLEMENTATION_NAMESPACE, name=TYPED_EVENT_IMPLEMENTATION, version=TYPED_EVENT_SPECIFICATION_VERSION)
public class TypedEventBusImpl implements TypedEventBus {
private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() {
@@ -248,7 +256,7 @@
}
private Filter getFilter(Long serviceId, Map<String, Object> properties) throws IllegalArgumentException {
- String key = "event.filter";
+ String key = TYPED_EVENT_FILTER;
return getFilter(serviceId, key, properties.get(key));
}
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
index 6667969..39bfbc9 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/impl/TypedEventBusImplTest.java
@@ -21,6 +21,11 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
+import static org.osgi.framework.Constants.SERVICE_ID;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TYPE;
+import static org.osgi.util.converter.Converters.standardConverter;
import java.util.HashMap;
import java.util.Map;
@@ -35,11 +40,9 @@
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.osgi.framework.Constants;
-import org.osgi.service.typedevent.TypedEventConstants;
import org.osgi.service.typedevent.TypedEventHandler;
import org.osgi.service.typedevent.UnhandledEventHandler;
import org.osgi.service.typedevent.UntypedEventHandler;
-import org.osgi.util.converter.Converters;
public class TypedEventBusImplTest {
@@ -132,31 +135,31 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
- serviceProperties.put(Constants.SERVICE_ID, 42L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+ serviceProperties.put(SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent2.class.getName());
- serviceProperties.put(Constants.SERVICE_ID, 43L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent2.class.getName());
+ serviceProperties.put(SERVICE_ID, 43L);
impl.addTypedEventHandler(handlerB, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
- serviceProperties.put(Constants.SERVICE_ID, 44L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ serviceProperties.put(SERVICE_ID, 44L);
impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
- serviceProperties.put(Constants.SERVICE_ID, 45L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName().replace(".", "/"));
+ serviceProperties.put(SERVICE_ID, 45L);
impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
@@ -206,20 +209,20 @@
event.message = "boo";
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(Constants.SERVICE_ID, 42L);
+ serviceProperties.put(SERVICE_ID, 42L);
impl.addTypedEventHandler(handler, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, SpecialTestEvent.class.getName());
- serviceProperties.put(Constants.SERVICE_ID, 43L);
+ serviceProperties.put(TYPED_EVENT_TYPE, SpecialTestEvent.class.getName());
+ serviceProperties.put(SERVICE_ID, 43L);
impl.addTypedEventHandler(handler2, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(Constants.SERVICE_ID, 44L);
+ serviceProperties.put(SERVICE_ID, 44L);
impl.addTypedEventHandler(handler3, serviceProperties);
@@ -258,35 +261,35 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName());
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
- serviceProperties.put(Constants.SERVICE_ID, 42L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent.class.getName());
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+ serviceProperties.put(SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName());
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent2.class.getName());
- serviceProperties.put(Constants.SERVICE_ID, 43L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName());
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent2.class.getName());
+ serviceProperties.put(SERVICE_ID, 43L);
impl.addTypedEventHandler(handlerB, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent.class.getName());
- serviceProperties.put(Constants.SERVICE_ID, 44L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent.class.getName());
+ serviceProperties.put(SERVICE_ID, 44L);
impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TestEvent2.class.getName());
- serviceProperties.put(Constants.SERVICE_ID, 45L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TestEvent2.class.getName());
+ serviceProperties.put(SERVICE_ID, 45L);
impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
- impl.deliver(event.getClass().getName(), Converters.standardConverter().convert(event).to(Map.class));
+ impl.deliver(event.getClass().getName(), standardConverter().convert(event).to(Map.class));
assertTrue(semA.tryAcquire(1, TimeUnit.SECONDS));
@@ -314,35 +317,35 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
- serviceProperties.put("event.filter", "(message=foo)");
- serviceProperties.put(Constants.SERVICE_ID, 42L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+ serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)");
+ serviceProperties.put(SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
- serviceProperties.put("event.filter", "(message=bar)");
- serviceProperties.put(Constants.SERVICE_ID, 43L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+ serviceProperties.put(TYPED_EVENT_FILTER, "(message=bar)");
+ serviceProperties.put(SERVICE_ID, 43L);
impl.addTypedEventHandler(handlerB, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
- serviceProperties.put("event.filter", "(message=foo)");
- serviceProperties.put(Constants.SERVICE_ID, 44L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)");
+ serviceProperties.put(SERVICE_ID, 44L);
impl.addUntypedEventHandler(untypedHandlerA, serviceProperties);
serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
- serviceProperties.put("event.filter", "(message=bar)");
- serviceProperties.put(Constants.SERVICE_ID, 45L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ serviceProperties.put(TYPED_EVENT_FILTER, "(message=bar)");
+ serviceProperties.put(SERVICE_ID, 45L);
impl.addUntypedEventHandler(untypedHandlerB, serviceProperties);
@@ -395,10 +398,10 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
- serviceProperties.put("event.filter", "");
- serviceProperties.put(Constants.SERVICE_ID, 42L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+ serviceProperties.put(TYPED_EVENT_FILTER, "");
+ serviceProperties.put(SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
@@ -420,10 +423,10 @@
Map<String, Object> serviceProperties = new HashMap<>();
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
- serviceProperties.put(TypedEventConstants.TYPED_EVENT_TYPE, TestEvent.class.getName());
- serviceProperties.put("event.filter", "(message=foo)");
- serviceProperties.put(Constants.SERVICE_ID, 42L);
+ serviceProperties.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ serviceProperties.put(TYPED_EVENT_TYPE, TestEvent.class.getName());
+ serviceProperties.put(TYPED_EVENT_FILTER, "(message=foo)");
+ serviceProperties.put(SERVICE_ID, 42L);
impl.addTypedEventHandler(handlerA, serviceProperties);
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
index d4dcc78..2d17847 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/FilterIntegrationTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.aries.typedevent.bus.osgi;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+
import java.util.Dictionary;
import java.util.Hashtable;
@@ -71,12 +73,12 @@
@Test
public void testFilteredListener() throws Exception {
Dictionary<String, Object> props = new Hashtable<>();
- props.put("event.filter", "(message=foo)");
+ props.put(TYPED_EVENT_FILTER, "(message=foo)");
regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
props = new Hashtable<>();
- props.put("event.filter", "(message=bar)");
+ props.put(TYPED_EVENT_FILTER, "(message=bar)");
regs.add(context.registerService(TypedEventHandler.class, typedEventHandlerB, props));
@@ -107,7 +109,7 @@
@Test
public void testFilteredListenerEmptyString() throws Exception {
Dictionary<String, Object> props = new Hashtable<>();
- props.put("event.filter", "");
+ props.put(TYPED_EVENT_FILTER, "");
regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
diff --git a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
index c781b62..084a92a 100644
--- a/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
+++ b/org.apache.aries.typedevent.bus/src/test/java/org/apache/aries/typedevent/bus/osgi/UnhandledEventHandlerIntegrationTest.java
@@ -24,6 +24,7 @@
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
import java.util.Dictionary;
import java.util.Hashtable;
@@ -128,7 +129,7 @@
public void testUnhandledDueToFilter() throws InterruptedException {
Dictionary<String, Object> props = new Hashtable<>();
- props.put("event.filter", "(message=foo)");
+ props.put(TYPED_EVENT_FILTER, "(message=foo)");
regs.add(context.registerService(TypedEventHandler.class, typedEventHandler, props));
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml
new file mode 100644
index 0000000..a54960e
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/pom.xml
@@ -0,0 +1,41 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.typedevent</groupId>
+ <artifactId>typedevent-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.aries.typedevent.remote</groupId>
+ <artifactId>org.apache.aries.typedevent.remote.api</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.typedevent</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.annotation</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.annotation.bundle</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java
new file mode 100644
index 0000000..56902c7
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/FilterDTO.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.dto.DTO;
+
+/**
+ * A monitoring event filter.
+ *
+ * If both LDAP and regular expressions are supplied, then both must match.
+ */
+@ProviderType
+public class FilterDTO extends DTO {
+
+ public String ldapExpression;
+
+ public String regularExpression;
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java
new file mode 100644
index 0000000..9b0ab57
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+/**
+ * This interface should not be used by typical users of the
+ * Typed Event specification. It is intended to be a bridge
+ * between different mechanisms for broadcasting remote events
+ */
+
+@ProviderType
+public class RemoteEventConstants {
+
+ /**
+ * This property key will be set to true in any event that originated from a remote system.
+ * This is to allow different remoting implementations to identify events which should not
+ * be sent on externally, as they are already external.
+ */
+ public static final String REMOTE_EVENT_MARKER = ".org.apache.aries.typedevent.remote";
+
+ /**
+ * This service property can be used by Event Handler whiteboard services to signal that
+ * they wish to receive remote events by using the value <code>true</code>. Depending
+ * upon the configuration of the remote event backend it may not be necessary to supply
+ * this property to receive remote events.
+ */
+ public static final String RECEIVE_REMOTE_EVENTS = "org.apache.aries.typedevent.remote.events";
+
+ private RemoteEventConstants() {
+ // Deliberately impossible to construct
+ }
+
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java
new file mode 100644
index 0000000..0bbcf9c
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEventMonitor.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.annotation.versioning.ProviderType;
+import org.osgi.util.pushstream.PushStream;
+
+import java.time.Instant;
+
+/**
+ * The {@link RemoteEventMonitor} service can be used to monitor the events that are
+ * sent using the EventBus, and that are received from remote EventBus
+ * instances
+ */
+@ProviderType
+public interface RemoteEventMonitor {
+
+ /**
+ * Get a stream of events that match any of the filters, starting now.
+ * <p>
+ * Filter expressions may be supplied and applied by the monitoring implementation.
+ * In some cases this may be more optimal than adding your own filter to the returned
+ * PushStream.
+ *
+ * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType}
+ * field is available with the key <code>-eventType</code> and the
+ * {@link RemoteMonitorEvent#publishType} field is available with the key
+ * <code>-publishType</code>, in addition to fields defined in the event.
+ * If the event contains nested data structures then those are accessible using
+ * nested key names separated by a '.' character (e.g. <code>"foo.bar"</code>
+ * would correspond to the <code>bar<code> field of the <code>foo</code> value
+ * from the event.
+ * <p>
+ * If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match.
+ * A RegEx pattern allows the whole event content to be matched, without necessarily specifying
+ * a key (although keys are present and separated with ':').
+ * @return A stream of event data
+ */
+ PushStream<RemoteMonitorEvent> monitorEvents(FilterDTO... filters);
+
+ /**
+ * Get a stream of events, including up to the
+ * requested number of historical data events, that match any of the filters.
+ *
+ * @param history The requested number of historical
+ * events, note that fewer than this number of events
+ * may be returned if history is unavailable, or if
+ * insufficient events have been sent.
+ *
+ * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType}
+ * field is available with the key <code>-eventType</code> and the
+ * {@link RemoteMonitorEvent#publishType} field is available with the key
+ * <code>-publishType</code>, in addition to fields defined in the event.
+ * If the event contains nested data structures then those are accessible using
+ * nested key names separated by a '.' character (e.g. <code>"foo.bar"</code>
+ * would correspond to the <code>bar<code> field of the <code>foo</code> value
+ * from the event.
+ * <p>
+ * If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match.
+ * A RegEx pattern allows the whole event content to be matched, without necessarily specifying
+ * a key (although keys are present and separated with ':').
+ *
+ * @return A stream of event data
+ */
+ PushStream<RemoteMonitorEvent> monitorEvents(int history, FilterDTO...filters);
+
+ /**
+ * Get a stream of events, including historical
+ * data events prior to the supplied time
+ *
+ * @param history The requested time after which
+ * historical events, should be included. Note
+ * that events may have been discarded, or history
+ * unavailable.
+ *
+ * @param filters containing filter expression definitions. The {@link RemoteMonitorEvent#eventType}
+ * field is available with the key <code>-eventType</code> and the
+ * {@link RemoteMonitorEvent#publishType} field is available with the key
+ * <code>-publishType</code>, in addition to fields defined in the event.
+ * If the event contains nested data structures then those are accessible using
+ * nested key names separated by a '.' character (e.g. <code>"foo.bar"</code>
+ * would correspond to the <code>bar<code> field of the <code>foo</code> value
+ * from the event.
+ * <p>
+ * If a {@link FilterDTO} contains both LDAP and regular expressions, then both must match.
+ * A RegEx pattern allows the whole event content to be matched, without necessarily specifying
+ * a key (although keys are present and separated with ':').
+ *
+ * @return A stream of event data
+ */
+ PushStream<RemoteMonitorEvent> monitorEvents(Instant history, FilterDTO...filters);
+
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java
new file mode 100644
index 0000000..a5ad278
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteEvents.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.service.component.annotations.ComponentPropertyType;
+
+/**
+ * This annotation can be used on a DS component to mark it as wanting to receive remote events
+ */
+@ComponentPropertyType
+public @interface RemoteEvents {
+ public static final java.lang.String PREFIX_ = "org.apache.aries.typedevent.";
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java
new file mode 100644
index 0000000..2391b39
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/RemoteMonitorEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.api;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+/**
+ * A monitoring event.
+ */
+@ProviderType
+public class RemoteMonitorEvent extends org.osgi.service.typedevent.monitor.MonitorEvent {
+
+ public static enum PublishType {
+ LOCAL, REMOTE;
+ }
+
+ public PublishType publishType;
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java
new file mode 100644
index 0000000..da953a5
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.api/src/main/java/org/apache/aries/typedevent/remote/api/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@org.osgi.annotation.bundle.Export
+@org.osgi.annotation.versioning.Version("0.0.1")
+package org.apache.aries.typedevent.remote.api;
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml
new file mode 100644
index 0000000..6647b7a
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/pom.xml
@@ -0,0 +1,111 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.typedevent</groupId>
+ <artifactId>typedevent-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <groupId>org.apache.aries.typedevent.remote.remoteservices</groupId>
+ <artifactId>org.apache.aries.typedevent.remote.remoteservices</artifactId>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.aries.typedevent</groupId>
+ <artifactId>typedevent-test-bom</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.typedevent</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.annotation</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.aries.typedevent.remote</groupId>
+ <artifactId>org.apache.aries.typedevent.remote.spi</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.annotation.bundle</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.namespace.service</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.aries.component-dsl</groupId>
+ <artifactId>org.apache.aries.component-dsl.component-dsl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.converter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.aries.typedevent</groupId>
+ <artifactId>org.apache.aries.typedevent.bus</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.test.junit5</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-core</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-resolver-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-testing-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java
new file mode 100644
index 0000000..9a1807f
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/LocalEventBusForwarder.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+import static java.util.Collections.emptyMap;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.apache.aries.typedevent.remote.spi.LocalEventConsumerManager;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.util.converter.TypeReference;
+
+/**
+ * This class is responsible for taking events from the local framework and
+ * sending them on to interested remote frameworks
+ */
+public class LocalEventBusForwarder extends LocalEventConsumerManager {
+
+ private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() {
+ };
+
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}. Values from
+ * the map should be copied as the contents are not thread safe.
+ */
+ private final Map<String, Map<RemoteEventBus, Filter>> eventTypeToRemotes = new HashMap<>();
+
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}.
+ * Values from the map should be copied as the contents are not thread safe.
+ */
+ private final Map<Long, List<String>> remoteTopicInterests = new HashMap<>();
+
+ /**
+ * Map access and mutation must be synchronized on {@link #lock}.
+ * Values from the map should be copied as the contents are not thread safe.
+ */
+ private final Map<Long, RemoteEventBus> remoteBuses = new HashMap<>();
+
+ private final Object lock = new Object();
+
+ @Override
+ public void notifyUntyped(String topic, Map<String, Object> event) {
+ Map<RemoteEventBus, Filter> possibleTargets;
+ synchronized (lock) {
+ possibleTargets = eventTypeToRemotes.getOrDefault(topic, emptyMap());
+ }
+
+ possibleTargets.entrySet().stream()
+ .filter(e -> e.getValue() == null || e.getValue().matches(event))
+ .map(Entry::getKey)
+ .forEach(r -> r.notify(topic, event));
+ }
+
+ private Long getServiceId(Map<String, Object> properties) {
+ return standardConverter().convert(properties.get(Constants.SERVICE_ID)).to(Long.class);
+ }
+
+ void addRemoteEventBus(RemoteEventBus remote, Map<String, Object> properties) {
+ doAdd(remote, properties);
+ updateRemoteInterest();
+ }
+
+ private void doAdd(RemoteEventBus remote, Map<String, Object> properties) {
+ Object consumed = properties.get(RemoteEventBus.REMOTE_EVENT_FILTERS);
+
+ if (consumed == null) {
+ // TODO log a broken behaviour
+ return;
+ }
+
+ Map<String, Filter> topicsToFilters = standardConverter().convert(consumed).to(LIST_OF_STRINGS)
+ .stream()
+ .map(s -> s.split("=", 2))
+ .collect(toMap(s -> s[0], s -> safeCreateFilter(s[1])));
+
+ Long serviceId = getServiceId(properties);
+
+ List<String> interestedTopics = topicsToFilters.keySet().stream().collect(toList());
+ synchronized (lock) {
+ remoteBuses.put(serviceId, remote);
+ remoteTopicInterests.put(serviceId, interestedTopics);
+
+ interestedTopics.forEach(s -> {
+ Map<RemoteEventBus, Filter> perTopicMap = eventTypeToRemotes
+ .computeIfAbsent(s, x -> new HashMap<>());
+ perTopicMap.put(remote, topicsToFilters.get(s));
+ });
+ }
+ }
+
+ private Filter safeCreateFilter(String filterString) {
+ try {
+ return FrameworkUtil.createFilter(filterString);
+ } catch (InvalidSyntaxException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ try {
+ return FrameworkUtil.createFilter("(&(x=true)(x=false))");
+ } catch (InvalidSyntaxException e1) {
+ // TODO log properly
+ throw new RuntimeException("Serious problem!");
+ }
+ }
+ }
+
+ void updatedRemoteEventBus(Map<String, Object> properties) {
+ Long serviceId = getServiceId(properties);
+ synchronized (lock) {
+ RemoteEventBus remote = remoteBuses.get(serviceId);
+ doRemove(remote, properties);
+ doAdd(remote, properties);
+ }
+ updateRemoteInterest();
+ }
+
+ void removeRemoteEventBus(RemoteEventBus remote, Map<String, Object> properties) {
+ doRemove(remote, properties);
+ updateRemoteInterest();
+ }
+
+ private void doRemove(RemoteEventBus remote, Map<String, Object> properties) {
+ Long serviceId = getServiceId(properties);
+
+ synchronized (lock) {
+ remoteBuses.remove(serviceId);
+ List<String> consumed = remoteTopicInterests.remove(serviceId);
+ if(consumed != null) {
+ consumed.forEach(s -> {
+ Map<RemoteEventBus, ?> perTopic = eventTypeToRemotes.get(s);
+ if(perTopic != null) {
+ perTopic.remove(remote);
+ if(perTopic.isEmpty()) {
+ eventTypeToRemotes.remove(s);
+ }
+ }
+ });
+ }
+ }
+ }
+
+ private void updateRemoteInterest() {
+
+ Map<String, String> targets;
+ synchronized (lock) {
+ targets = eventTypeToRemotes.entrySet().stream()
+ .collect(Collectors.toMap(Entry::getKey,
+ e -> e.getValue().values().stream()
+ .map(f -> f == null ? "" : f.toString())
+ .reduce("", this::mergeFilterStrings)));
+
+ }
+
+ updateTargets(targets);
+ }
+
+ private String mergeFilterStrings(String a, String b) {
+ if(a == null || "".equals(a)) {
+ return b == null ? "" : b;
+ } else if (b == null || "".equals(b)) {
+ return a;
+ } else {
+ return "(|" + a + b + ")";
+ }
+ }
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
new file mode 100644
index 0000000..67f63e4
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImpl.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
+import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
+
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.apache.aries.typedevent.remote.api.RemoteEventConstants;
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.osgi.annotation.bundle.Capability;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.annotations.RequireTypedEvents;
+
+/**
+ * This class implements {@link RemoteEventBus} and is responsible for receiving
+ * events from remote frameworks and publishing them in the local framework
+ */
+@Capability(namespace=SERVICE_NAMESPACE, attribute="objectClass:List<String>=org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus", uses=RemoteEventBus.class)
+@RequireTypedEvents
+public class RemoteEventBusImpl implements RemoteEventBus {
+
+ private final TypedEventBus eventBus;
+
+ private ServiceRegistration<RemoteEventBus> reg;
+
+ private Map<String, Filter> topicsToFilters = new HashMap<>();
+
+ private final Map<Long, Map<String, Filter>> servicesToInterests = new HashMap<>();
+
+ private final Object lock = new Object();
+
+ public RemoteEventBusImpl(TypedEventBus eventBus) {
+ this.eventBus = eventBus;
+ }
+
+ public void init(BundleContext ctx) {
+ ServiceRegistration<RemoteEventBus> reg = ctx.registerService(RemoteEventBus.class, this, null);
+
+ Map<String, Filter> filters;
+ synchronized(lock) {
+ this.reg = reg;
+ filters = topicsToFilters;
+ }
+ updateReg(filters);
+ }
+
+ public void destroy() {
+ try {
+ ServiceRegistration<?> reg;
+ synchronized (lock) {
+ reg = this.reg;
+ this.reg = null;
+ }
+
+ if(reg != null) {
+ reg.unregister();
+ }
+ } catch (IllegalStateException ise) {
+ // TODO log
+ }
+ }
+
+ @Override
+ public void notify(String topic, Map<String, Object> properties) {
+
+ boolean hasTopicInterest;
+ Filter filter;
+ synchronized (lock) {
+ hasTopicInterest = topicsToFilters.containsKey(topic);
+ filter = topicsToFilters.get(topic);
+ }
+
+ if(hasTopicInterest) {
+ if(filter == null || filter.matches(properties)) {
+ properties.put(RemoteEventConstants.REMOTE_EVENT_MARKER, Boolean.TRUE);
+ eventBus.deliverUntyped(topic, properties);
+ } else {
+ //TODO log filter mismatch
+ }
+ } else {
+ // TODO log topic mismatch
+ }
+ }
+
+ /**
+ * Update the data structures and registration to reflect the topic interests
+ * of the local framework
+ *
+ * @param id
+ * @param topics
+ * @param filter
+ */
+ void updateLocalInterest(Long id, List<String> topics, Filter filter) {
+
+ boolean doUpdate = false;
+
+ Map<String, Filter> newData = topics.stream()
+ .collect(toMap(identity(), x -> filter, (a,b) -> a));
+
+ Map<String, Filter> updatedFilters;
+ synchronized(lock) {
+ doUpdate = true;
+ servicesToInterests.put(id, newData);
+ topicsToFilters = getUpdatedFilters();
+ updatedFilters = topicsToFilters;
+ }
+
+ if(doUpdate) {
+ updateReg(updatedFilters);
+ }
+ }
+
+ private Map<String, Filter> getUpdatedFilters() {
+ synchronized (lock) {
+ return servicesToInterests.values().stream()
+ .flatMap(m -> m.entrySet().stream())
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue,
+ this::combineFilters));
+ }
+ }
+
+ private Filter combineFilters(Filter a, Filter b) {
+ if(a == null) {
+ return b;
+ } else if (b == null) {
+ return a;
+ } else {
+ try {
+ return FrameworkUtil.createFilter("(|" + a.toString() + b.toString() + ")");
+ } catch (InvalidSyntaxException e) {
+ // TODO Auto-generated catch block
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void updateReg(Map<String, Filter> filters) {
+
+ Hashtable<String, Object> props = new Hashtable<>();
+
+ props.put(Constants.SERVICE_EXPORTED_INTERFACES, RemoteEventBus.class.getName());
+ props.put(Constants.SERVICE_EXPORTED_INTENTS, "osgi.basic");
+ List<String> remoteFilters = filters.entrySet().stream()
+ .map(e -> e.getKey() + "=" + (e.getValue() == null ? "" : e.getValue().toString()))
+ .collect(toList());
+ props.put(REMOTE_EVENT_FILTERS, remoteFilters);
+
+
+ ServiceRegistration<?> reg;
+ synchronized (lock) {
+ reg = this.reg;
+ }
+
+ if(reg != null) {
+ // Only update if there is a change
+ Object existingFilters = reg.getReference().getProperty(REMOTE_EVENT_FILTERS);
+ if(!remoteFilters.equals(existingFilters)) {
+ reg.setProperties(props);
+ }
+ // Deal with a race condition if
+ Map<String, Filter> updatedFilters;
+ synchronized (lock) {
+ updatedFilters = topicsToFilters;
+ }
+ if(!updatedFilters.equals(filters)) {
+ updateReg(updatedFilters);
+ }
+ }
+ }
+
+ void removeLocalInterest(Long id) {
+
+ Map<String, Filter> updatedFilters;
+ synchronized(lock) {
+ if(servicesToInterests.remove(id) == null) {
+ return;
+ }
+ topicsToFilters = getUpdatedFilters();
+ updatedFilters = topicsToFilters;
+ }
+
+ updateReg(updatedFilters);
+ }
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
new file mode 100644
index 0000000..83b80dd
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteServiceEventsActivator.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.function.Function.identity;
+import static org.apache.aries.component.dsl.OSGi.all;
+import static org.apache.aries.component.dsl.OSGi.bundleContext;
+import static org.apache.aries.component.dsl.OSGi.coalesce;
+import static org.apache.aries.component.dsl.OSGi.configuration;
+import static org.apache.aries.component.dsl.OSGi.just;
+import static org.apache.aries.component.dsl.OSGi.once;
+import static org.apache.aries.component.dsl.OSGi.register;
+import static org.apache.aries.component.dsl.OSGi.service;
+import static org.apache.aries.component.dsl.OSGi.serviceReferences;
+import static org.apache.aries.typedevent.remote.spi.LocalEventConsumerManager.ARIES_LOCAL_EVENT_PROXY;
+import static org.osgi.framework.Constants.BUNDLE_ACTIVATOR;
+import static org.osgi.framework.Constants.SERVICE_ID;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.aries.component.dsl.OSGi;
+import org.apache.aries.component.dsl.OSGiResult;
+import org.apache.aries.typedevent.remote.api.RemoteEventMonitor;
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.apache.aries.typedevent.remote.spi.RemoteEventMonitorImpl;
+import org.osgi.annotation.bundle.Header;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.TypedEventConstants;
+import org.osgi.service.typedevent.TypedEventHandler;
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.service.typedevent.monitor.TypedEventMonitor;
+import org.osgi.util.converter.TypeReference;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Header(name = BUNDLE_ACTIVATOR, value = "${@class}")
+public class RemoteServiceEventsActivator implements BundleActivator {
+
+ private static final Logger _log = LoggerFactory.getLogger(RemoteServiceEventsActivator.class);
+
+ OSGiResult eventBus;
+
+ @Override
+ public void start(BundleContext bundleContext) throws Exception {
+ if (_log.isDebugEnabled()) {
+ _log.debug("Aries Remote Typed Events (Remote Services) Starting");
+ }
+
+ eventBus = coalesce(configuration("org.apache.aries.typedevent.remote.remoteservices"), just(Hashtable::new))
+ .map(this::toConfigProps).flatMap(configuration -> createProgram(configuration)).run(bundleContext);
+
+ if (_log.isDebugEnabled()) {
+ _log.debug("Aries Typed Event Bus Started");
+ }
+ }
+
+ private OSGi<?> createProgram(Map<String, ?> configuration) {
+
+ OSGi<Object> monitor = service(once(serviceReferences(TypedEventMonitor.class)))
+ .map(RemoteEventMonitorImpl::new)
+ .flatMap(remi -> register(RemoteEventMonitor.class, remi, new HashMap<>()));
+
+ OSGi<Object> remote = bundleContext().flatMap(ctx -> service(once(serviceReferences(TypedEventBus.class)))
+ .map(RemoteEventBusImpl::new).effects(rebi -> rebi.init(ctx), rebi -> rebi.destroy())
+ .flatMap(rebi -> all(
+ just(new UntypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast)
+ .effects(st -> st.open(), st -> st.close()),
+ just(new TypedEventTracker(ctx, rebi)).map(ServiceTracker.class::cast).effects(st -> st.open(),
+ st -> st.close()))));
+
+ OSGi<Object> local = bundleContext()
+ .flatMap(ctx -> just(new LocalEventBusForwarder()).effects(lebf -> lebf.start(ctx), lebf -> lebf.stop())
+ .flatMap(lebf -> serviceReferences(RemoteEventBus.class, "(service.imported=true)", csr -> {
+ lebf.updatedRemoteEventBus(getServiceProps(csr.getServiceReference()));
+ return false;
+ }).flatMap(csr -> service(csr).effects(
+ reb -> lebf.addRemoteEventBus(reb, getServiceProps(csr.getServiceReference())),
+ reb -> lebf.removeRemoteEventBus(reb, getServiceProps(csr.getServiceReference()))))));
+
+ return all(monitor, remote, local);
+ }
+
+ private Map<String, Object> toConfigProps(Dictionary<String, ?> config) {
+ Enumeration<String> keys = config.keys();
+ Map<String, Object> map = new HashMap<>();
+ while (keys.hasMoreElements()) {
+ String key = keys.nextElement();
+ map.put(key, config.get(key));
+ }
+ return map;
+ }
+
+ private Map<String, Object> getServiceProps(ServiceReference<?> ref) {
+ return Arrays.stream(ref.getPropertyKeys()).collect(Collectors.toMap(identity(), ref::getProperty));
+ }
+
+ @Override
+ public void stop(BundleContext context) throws Exception {
+ if (_log.isDebugEnabled()) {
+ _log.debug("Aries Typed Event Bus Stopping");
+ }
+
+ eventBus.close();
+
+ if (_log.isDebugEnabled()) {
+ _log.debug("Aries Typed Event Bus Stopped");
+ }
+ }
+
+ private static final TypeReference<List<String>> LIST_OF_STRINGS = new TypeReference<List<String>>() {
+ };
+
+ private static Long getServiceId(ServiceReference<?> ref) {
+ return standardConverter().convert(ref.getProperty(SERVICE_ID)).to(Long.class);
+ }
+
+ private static List<String> getTopics(ServiceReference<?> ref) {
+ return standardConverter().convert(ref.getProperty(TYPED_EVENT_TOPICS)).to(LIST_OF_STRINGS);
+ }
+
+ private static Filter getFilter(ServiceReference<?> ref) throws InvalidSyntaxException {
+ String filter = standardConverter().convert(ref.getProperty(TYPED_EVENT_FILTER)).to(String.class);
+ if (filter == null || "".equals(filter)) {
+ return null;
+ } else {
+ return FrameworkUtil.createFilter(filter);
+ }
+ }
+
+ private static class UntypedEventTracker extends ServiceTracker<UntypedEventHandler, Object> {
+
+ private final RemoteEventBusImpl impl;
+
+ public UntypedEventTracker(BundleContext context, RemoteEventBusImpl impl) {
+ super(context, UntypedEventHandler.class, null);
+ this.impl = impl;
+ }
+
+ @Override
+ public Object addingService(ServiceReference<UntypedEventHandler> reference) {
+
+ if(TRUE.equals(reference.getProperty(ARIES_LOCAL_EVENT_PROXY))) {
+ // Ignore remote interest proxies
+ return null;
+ }
+
+ Filter filter;
+ try {
+ filter = getFilter(reference);
+ } catch (InvalidSyntaxException e) {
+ // TODO Auto-generated catch block
+ return reference;
+ }
+ impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+ return reference;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<UntypedEventHandler> reference, Object service) {
+ Filter filter;
+ try {
+ filter = getFilter(reference);
+ } catch (InvalidSyntaxException e) {
+ // TODO Auto-generated catch block
+ impl.removeLocalInterest(getServiceId(reference));
+ return;
+ }
+ impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+ }
+
+ @Override
+ public void removedService(ServiceReference<UntypedEventHandler> reference, Object service) {
+ impl.removeLocalInterest(getServiceId(reference));
+ }
+ };
+
+ @SuppressWarnings("rawtypes")
+ private static class TypedEventTracker extends ServiceTracker<TypedEventHandler, TypedEventHandler> {
+
+ private final RemoteEventBusImpl impl;
+
+ public TypedEventTracker(BundleContext context, RemoteEventBusImpl impl) {
+ super(context, TypedEventHandler.class, null);
+ this.impl = impl;
+ }
+
+ @Override
+ public TypedEventHandler addingService(ServiceReference<TypedEventHandler> reference) {
+ TypedEventHandler toReturn = context.getService(reference);
+ Filter filter;
+ try {
+ filter = getFilter(reference);
+ } catch (InvalidSyntaxException e) {
+ // TODO Auto-generated catch block
+ return toReturn;
+ }
+ List<String> topics = findTopics(reference, toReturn);
+ if (!topics.isEmpty()) {
+ impl.updateLocalInterest(getServiceId(reference), topics, filter);
+ }
+ return toReturn;
+ }
+
+ private List<String> findTopics(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) {
+ List<String> topics = getTopics(reference);
+ if (topics.isEmpty()) {
+ Object type = reference.getProperty(TypedEventConstants.TYPED_EVENT_TYPE);
+ if (type != null) {
+ topics = Collections.singletonList(String.valueOf(type).replace(".", "/"));
+ } else {
+ Class<?> clazz = discoverTypeForTypedHandler(service);
+ if (clazz != null) {
+ topics = Collections.singletonList(clazz.getName().replace(".", "/"));
+ }
+ }
+ }
+ return topics;
+ }
+
+ /**
+ * Extensively copied from the Core Event Bus - is there a better way to share
+ * this?
+ *
+ * @param handler
+ * @param properties
+ * @return
+ */
+ private Class<?> discoverTypeForTypedHandler(TypedEventHandler<?> handler) {
+ Class<?> clazz = null;
+ Class<?> toCheck = handler.getClass();
+ while (clazz == null) {
+ clazz = findDirectlyImplemented(toCheck);
+
+ if (clazz != null) {
+ break;
+ }
+
+ clazz = processInterfaceHierarchyForClass(toCheck);
+
+ if (clazz != null) {
+ break;
+ }
+
+ toCheck = toCheck.getSuperclass();
+ }
+
+ return clazz;
+ }
+
+ private Class<?> processInterfaceHierarchyForClass(Class<?> toCheck) {
+ Class<?> clazz = null;
+ for (Class<?> iface : toCheck.getInterfaces()) {
+ clazz = findDirectlyImplemented(iface);
+
+ if (clazz != null) {
+ break;
+ }
+
+ clazz = processInterfaceHierarchyForClass(iface);
+
+ if (clazz != null) {
+ break;
+ }
+ }
+ return clazz;
+ }
+
+ private Class<?> findDirectlyImplemented(Class<?> toCheck) {
+ return Arrays.stream(toCheck.getGenericInterfaces()).filter(ParameterizedType.class::isInstance)
+ .map(ParameterizedType.class::cast).filter(t -> TypedEventHandler.class.equals(t.getRawType()))
+ .map(t -> t.getActualTypeArguments()[0]).findFirst().map(Class.class::cast).orElse(null);
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) {
+ Filter filter;
+ try {
+ filter = getFilter(reference);
+ } catch (InvalidSyntaxException e) {
+ // TODO Auto-generated catch block
+ impl.removeLocalInterest(getServiceId(reference));
+ return;
+ }
+
+ List<String> topics = findTopics(reference, service);
+ if (topics.isEmpty()) {
+ impl.removeLocalInterest(getServiceId(reference));
+ } else {
+ impl.updateLocalInterest(getServiceId(reference), getTopics(reference), filter);
+ }
+ }
+
+ @Override
+ public void removedService(ServiceReference<TypedEventHandler> reference, TypedEventHandler service) {
+ impl.removeLocalInterest(getServiceId(reference));
+ }
+ };
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java
new file mode 100644
index 0000000..2c0336e
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/RemoteEventBus.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.spi;
+
+import java.util.Map;
+
+import org.osgi.annotation.versioning.ProviderType;
+
+/**
+ * This interface should not be used by typical users of the
+ * Typed Event specification. It is intended to be a bridge
+ * between different mechanisms for broadcasting remote events
+ */
+
+@ProviderType
+public interface RemoteEventBus {
+
+ /**
+ * This service property provides a String+ containing <topic>=<filter>
+ * entries indicating the events that the remote nodes are interested in.
+ */
+ public static final String REMOTE_EVENT_FILTERS = "remote.event.filters";
+
+ /**
+ * Called to notify this instance of an event from a remote framework
+ */
+ public void notify(String topic, Map<String, Object> eventData);
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java
new file mode 100644
index 0000000..ed6cdcc
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/main/java/org/apache/aries/typedevent/remote/remoteservices/spi/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@org.osgi.annotation.bundle.Export
+@org.osgi.annotation.versioning.Version("0.0.1")
+package org.apache.aries.typedevent.remote.remoteservices.spi;
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java
new file mode 100644
index 0000000..3d6dc4e
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/common/TestEvent.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.common;
+
+public class TestEvent {
+ public String message;
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
new file mode 100644
index 0000000..9ddd8e1
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/impl/RemoteEventBusImplTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.impl;
+
+import static java.util.Collections.emptyList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.osgi.framework.FrameworkUtil.createFilter;
+
+import java.util.Arrays;
+import java.util.Dictionary;
+
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.typedevent.TypedEventBus;
+
+@SuppressWarnings("unchecked")
+public class RemoteEventBusImplTest {
+
+ @Mock
+ BundleContext context;
+
+ @Mock
+ ServiceRegistration<RemoteEventBus> remoteReg;
+ @Mock
+ ServiceReference<RemoteEventBus> remoteRef;
+
+ @Mock
+ TypedEventBus eventBusImpl;
+
+ RemoteEventBusImpl remoteImpl;
+
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ public void start() {
+
+ mocks = MockitoAnnotations.openMocks(this);
+
+ Mockito.when(context.registerService(Mockito.eq(RemoteEventBus.class),
+ Mockito.any(RemoteEventBus.class), Mockito.any())).thenReturn(remoteReg);
+ Mockito.when(remoteReg.getReference()).thenReturn(remoteRef);
+
+ remoteImpl = new RemoteEventBusImpl(eventBusImpl);
+ }
+
+
+ @AfterEach
+ public void destroy() throws Exception {
+ remoteImpl.destroy();
+ mocks.close();
+ }
+
+ @Test
+ public void testEmptyStart() {
+ remoteImpl.init(context);
+
+ ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class);
+
+ Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
+ propsCaptor.capture());
+
+ Dictionary<String, Object> props = propsCaptor.getValue();
+ assertNull(props);
+
+ Mockito.verify(remoteReg).setProperties(propsCaptor.capture());
+
+ props = propsCaptor.getValue();
+
+ assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+ assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+ }
+
+ @Test
+ public void testStartWithDetails() throws InvalidSyntaxException {
+
+ remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
+
+ remoteImpl.init(context);
+
+ ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class);
+
+ Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
+ propsCaptor.capture());
+
+ Dictionary<String, Object> props = propsCaptor.getValue();
+ assertNull(props);
+
+ Mockito.verify(remoteReg).setProperties(propsCaptor.capture());
+
+ props = propsCaptor.getValue();
+
+ assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+ assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+ }
+
+ @Test
+ public void testLateRegisterOfListener() throws InvalidSyntaxException {
+ remoteImpl.init(context);
+
+ ArgumentCaptor<Dictionary<String, Object>> propsCaptor = ArgumentCaptor.forClass(Dictionary.class);
+
+ Mockito.verify(context).registerService(Mockito.eq(RemoteEventBus.class), Mockito.same(remoteImpl),
+ propsCaptor.capture());
+
+ Dictionary<String, Object> props = propsCaptor.getValue();
+ assertNull(props);
+
+ Mockito.verify(remoteReg).setProperties(propsCaptor.capture());
+
+ props = propsCaptor.getValue();
+
+ assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+ assertEquals(emptyList(), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+
+ // Add a listener to the remote
+
+ remoteImpl.updateLocalInterest(42L, Arrays.asList("FOO"), createFilter("(fizz=buzz)"));
+
+ Mockito.verify(remoteReg, Mockito.times(2)).setProperties(propsCaptor.capture());
+
+ props = propsCaptor.getValue();
+
+ assertEquals(RemoteEventBus.class.getName(), props.get("service.exported.interfaces"));
+ assertEquals(Arrays.asList("FOO=(fizz=buzz)"), props.get(RemoteEventBus.REMOTE_EVENT_FILTERS));
+ }
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java
new file mode 100644
index 0000000..dfcf586
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/AbstractIntegrationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.osgi;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.typedevent.remote.remoteservices.common.TestEvent;
+import org.junit.jupiter.api.AfterEach;
+import org.mockito.ArgumentMatcher;
+import org.osgi.framework.ServiceRegistration;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ *
+ * It can interact with the framework by starting or stopping bundles,
+ * getting or registering services, or in other ways, and then observing
+ * the result on the bundle(s) being tested.
+ */
+public abstract class AbstractIntegrationTest {
+
+ protected static final String TEST_EVENT_TOPIC = TestEvent.class.getName().replace(".", "/");
+
+
+ protected final List<ServiceRegistration<?>> regs = new ArrayList<ServiceRegistration<?>>();
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ regs.forEach(sr -> {
+ try {
+ sr.unregister();
+ } catch (Exception e) { }
+ });
+ }
+
+ protected ArgumentMatcher<TestEvent> isTestEventWithMessage(String message) {
+ return new ArgumentMatcher<TestEvent>() {
+
+ @Override
+ public boolean matches(TestEvent argument) {
+ return message.equals(argument.message);
+ }
+ };
+ }
+
+ protected ArgumentMatcher<Map<String, Object>> isUntypedTestEventWithMessage(String message) {
+ return new ArgumentMatcher<Map<String, Object>>() {
+
+ @Override
+ public boolean matches(Map<String, Object> argument) {
+ return argument != null && message.equals(argument.get("message"));
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
new file mode 100644
index 0000000..9788848
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/src/test/java/org/apache/aries/typedevent/remote/remoteservices/osgi/RemoteEventBusIntegrationTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.remoteservices.osgi;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
+
+import java.io.File;
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.ServiceLoader;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.aries.typedevent.remote.remoteservices.common.TestEvent;
+import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.BundleException;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceException;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.framework.launch.Framework;
+import org.osgi.framework.launch.FrameworkFactory;
+import org.osgi.framework.wiring.FrameworkWiring;
+import org.osgi.service.typedevent.TypedEventBus;
+import org.osgi.service.typedevent.UnhandledEventHandler;
+import org.osgi.service.typedevent.UntypedEventHandler;
+import org.osgi.test.common.annotation.InjectBundleContext;
+import org.osgi.test.common.annotation.InjectService;
+import org.osgi.test.junit5.context.BundleContextExtension;
+import org.osgi.test.junit5.service.ServiceExtension;
+import org.osgi.util.tracker.ServiceTracker;
+
+/**
+ * This is a JUnit test that will be run inside an OSGi framework.
+ *
+ * It can interact with the framework by starting or stopping bundles, getting
+ * or registering services, or in other ways, and then observing the result on
+ * the bundle(s) being tested.
+ */
+@ExtendWith(BundleContextExtension.class)
+@ExtendWith(ServiceExtension.class)
+public class RemoteEventBusIntegrationTest extends AbstractIntegrationTest {
+
+ private static final String REMOTE_BUS = RemoteEventBus.class.getName();
+ private static final String UNTYPED_HANDLER = UntypedEventHandler.class.getName();
+ private static final String UNHANDLED_HANDLER = UnhandledEventHandler.class.getName();
+ private Map<UUID, Framework> frameworks;
+ private Map<UUID, ServiceTracker<?,?>> remoteServicePublishers = new ConcurrentHashMap<>();
+
+ @InjectBundleContext
+ BundleContext bundleContext;
+
+ @InjectService
+ TypedEventBus bus;
+
+ @Mock
+ UntypedEventHandler untypedEventHandler;
+
+ @Mock
+ UnhandledEventHandler unhandledEventHandler;
+
+ AutoCloseable mocks;
+
+ @BeforeEach
+ public void setUpFrameworks() throws Exception {
+ mocks = MockitoAnnotations.openMocks(this);
+
+ assertNotNull(bundleContext, "OSGi Bundle tests must be run inside an OSGi framework");
+
+ frameworks = createFrameworks(2);
+ frameworks.put(getMasterFrameworkUUID(), bundleContext.getBundle(0).adapt(Framework.class));
+
+ for (Entry<UUID, Framework> entry : frameworks.entrySet()) {
+ Framework f = entry.getValue();
+
+ BundleContext context = f.getBundleContext();
+ ServiceTracker<Object, Object> tracker = createCrossFrameworkPublisher(entry, context);
+
+ remoteServicePublishers.put(entry.getKey(), tracker);
+ }
+ }
+
+ private ServiceTracker<Object, Object> createCrossFrameworkPublisher(Entry<UUID, Framework> entry,
+ BundleContext context) {
+ ServiceTracker<Object, Object> tracker = new ServiceTracker<Object, Object>(context,
+ REMOTE_BUS, null) {
+
+ Map<UUID, ServiceRegistration<?>> registered = new ConcurrentHashMap<>();
+
+ @Override
+ public Object addingService(ServiceReference<Object> reference) {
+
+ if(reference.getBundle().getBundleId() == 0) {
+ return null;
+ }
+
+ Object service = super.addingService(reference);
+
+ for (Entry<UUID, Framework> e : frameworks.entrySet()) {
+ UUID fwkId = entry.getKey();
+ if(fwkId.equals(e.getKey())) {
+ // Skip this framework as it's the same framework the service came from
+ continue;
+ }
+
+ Framework fw = e.getValue();
+
+ registered.put(fwkId, fw.getBundleContext().registerService(
+ REMOTE_BUS, new EventHandlerFactory(service, REMOTE_BUS),
+ getRegistrationProps(reference)));
+ }
+
+ return service;
+ }
+
+ Dictionary<String, Object> getRegistrationProps(ServiceReference<?> ref) {
+ Dictionary<String, Object> toReturn = new Hashtable<String, Object>();
+ String[] props = ref.getPropertyKeys();
+ for(String key : props) {
+ toReturn.put(key, ref.getProperty(key));
+ }
+
+ toReturn.put("service.imported", true);
+ return toReturn;
+ }
+
+ @Override
+ public void modifiedService(ServiceReference<Object> reference, Object service) {
+ for(ServiceRegistration<?> reg : registered.values()) {
+ reg.setProperties(getRegistrationProps(reference));
+ }
+ }
+
+ @Override
+ public void removedService(ServiceReference<Object> reference, Object service) {
+ for (ServiceRegistration<?> registration : registered.values()) {
+ try {
+ registration.unregister();
+ } catch (Exception e) {
+ // Never mind
+ }
+ }
+ registered.clear();
+ super.removedService(reference, service);
+ }
+
+ };
+ tracker.open(true);
+ return tracker;
+ }
+
+ @AfterEach
+ public void shutdownFrameworks() throws Exception {
+
+ frameworks.remove(getMasterFrameworkUUID());
+
+ remoteServicePublishers.values().forEach(ServiceTracker::close);
+ remoteServicePublishers.clear();
+
+ frameworks.values().forEach(f -> {
+ try {
+ f.stop();
+ } catch (BundleException be) {
+ // Never mind
+ }
+ });
+
+ frameworks.clear();
+
+ mocks.close();
+ }
+
+ private Map<UUID, Framework> createFrameworks(int size) throws BundleException {
+
+ FrameworkFactory ff = ServiceLoader.load(FrameworkFactory.class,
+ FrameworkFactory.class.getClassLoader()).iterator().next();
+
+ List<String> locations = new ArrayList<>();
+
+ for(Bundle b : bundleContext.getBundles()) {
+ if(
+ b.getSymbolicName().equals("org.apache.aries.typedevent.bus") ||
+ b.getSymbolicName().equals("org.apache.aries.typedevent.remote.api") ||
+ b.getSymbolicName().equals("org.apache.aries.typedevent.remote.spi") ||
+ b.getSymbolicName().equals("org.apache.aries.typedevent.remote.remoteservices") ||
+ b.getSymbolicName().equals("org.apache.aries.component-dsl.component-dsl") ||
+ b.getSymbolicName().equals("org.apache.felix.converter") ||
+ b.getSymbolicName().equals("org.apache.felix.configadmin") ||
+ b.getSymbolicName().equals("org.osgi.service.typedevent") ||
+ b.getSymbolicName().equals("org.osgi.util.function") ||
+ b.getSymbolicName().equals("org.osgi.util.promise") ||
+ b.getSymbolicName().equals("org.osgi.util.pushstream") ||
+ b.getSymbolicName().equals("slf4j.api") ||
+ b.getSymbolicName().startsWith("ch.qos.logback")) {
+ locations.add(b.getLocation());
+ }
+ }
+
+ Map<UUID, Framework> frameworks = new HashMap<UUID, Framework>();
+ for(int i = 1; i < size; i++) {
+ Map<String, String> fwConfig = new HashMap<>();
+ fwConfig.put(Constants.FRAMEWORK_STORAGE, new File(bundleContext.getDataFile(""), "Test-Cluster" + i).getAbsolutePath());
+ fwConfig.put(Constants.FRAMEWORK_STORAGE_CLEAN, Constants.FRAMEWORK_STORAGE_CLEAN_ONFIRSTINIT);
+ Framework f = ff.newFramework(fwConfig);
+ f.init();
+ for(String s : locations) {
+ f.getBundleContext().installBundle(s);
+ }
+ f.start();
+ f.adapt(FrameworkWiring.class).resolveBundles(Collections.emptySet());
+ for(Bundle b : f.getBundleContext().getBundles()) {
+ if(b.getHeaders().get(Constants.FRAGMENT_HOST) == null) {
+ b.start();
+ }
+ }
+ frameworks.put(getUUID(f), f);
+ }
+ return frameworks;
+ }
+
+ private UUID getMasterFrameworkUUID() {
+ return UUID.fromString(bundleContext.getProperty(Constants.FRAMEWORK_UUID));
+ }
+
+ private UUID getUUID(Framework f) {
+ return UUID.fromString(f.getBundleContext().getProperty(Constants.FRAMEWORK_UUID));
+ }
+
+
+ public static class EventHandlerFactory implements ServiceFactory<Object> {
+
+ private final Object delegate;
+ private final String typeToMimic;
+
+ public EventHandlerFactory(Object delegate, String typeToMimic) {
+ this.delegate = delegate;
+ this.typeToMimic = typeToMimic;
+ }
+
+ @Override
+ public Object getService(Bundle bundle, ServiceRegistration<Object> registration) {
+
+ try {
+ Class<?> loadClass = bundle.loadClass(typeToMimic);
+
+ return Proxy.newProxyInstance(loadClass.getClassLoader(), new Class<?>[] {loadClass},
+ (o,m,a) -> {
+
+ if(m.getName().startsWith("notify") && m.getParameterTypes().length > 0) {
+ return delegate.getClass().getMethod(m.getName(), m.getParameterTypes())
+ .invoke(delegate, a);
+ } else {
+ return m.invoke(delegate, a);
+ }
+ });
+
+ } catch (Exception e) {
+ throw new ServiceException("failed to create service", e);
+ }
+ }
+
+ @Override
+ public void ungetService(Bundle bundle, ServiceRegistration<Object> registration, Object service) {
+ // TODO Auto-generated method stub
+
+ }
+
+ }
+
+ @Test
+ public void testSendToRemoteFramework() throws InterruptedException {
+
+ Dictionary<String, Object> props = new Hashtable<>();
+ regs.add(bundleContext.registerService(UNHANDLED_HANDLER, unhandledEventHandler, props));
+
+ TestEvent event = new TestEvent();
+ event.message = "boo";
+
+ bus.deliver(event);
+
+
+ verify(unhandledEventHandler, Mockito.after(100).times(1))
+ .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+
+
+ BundleContext remoteContext = frameworks.values().stream()
+ .filter(fw -> !getUUID(fw).equals(getMasterFrameworkUUID()))
+ .flatMap(fw -> Arrays.stream(fw.getBundleContext().getBundles()))
+ .filter(b -> b.getSymbolicName().equals("org.osgi.service.typedevent"))
+ .map(Bundle::getBundleContext)
+ .findFirst()
+ .get();
+
+ props = new Hashtable<>();
+ props.put(TYPED_EVENT_TOPICS, TEST_EVENT_TOPIC);
+ props.put(TYPED_EVENT_FILTER, "(message=boo)");
+
+ regs.add(remoteContext.registerService(UNTYPED_HANDLER,
+ new EventHandlerFactory(untypedEventHandler, UNTYPED_HANDLER), props));
+
+
+ bus.deliver(event);
+
+ verify(unhandledEventHandler, Mockito.after(1000).times(1))
+ .notifyUnhandled(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+
+ verify(untypedEventHandler)
+ .notifyUntyped(eq(TEST_EVENT_TOPIC), argThat(isUntypedTestEventWithMessage("boo")));
+ }
+
+}
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
new file mode 100644
index 0000000..704e7b4
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.remoteservices/test.bndrun
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+-tester: biz.aQute.tester.junit-platform
+
+-runfw: org.apache.felix.framework
+
+-runrequires: bnd.identity;id="org.apache.aries.typedevent.remote.remoteservices-tests",\
+ bnd.identity;id="junit-jupiter-engine",\
+ bnd.identity;id="junit-platform-launcher"
+
+-runsystempackages: sun.reflect
+
+-resolve.effective: active
+-runbundles: \
+ ch.qos.logback.classic;version='[1.2.3,1.2.4)',\
+ ch.qos.logback.core;version='[1.2.3,1.2.4)',\
+ org.apache.aries.component-dsl.component-dsl;version='[1.2.2,1.2.3)',\
+ org.apache.felix.configadmin;version='[1.9.18,1.9.19)',\
+ org.apache.felix.converter;version='[1.0.14,1.0.15)',\
+ org.osgi.service.typedevent;version='[1.0.0,1.0.1)',\
+ org.osgi.util.function;version='[1.1.0,1.1.1)',\
+ org.osgi.util.promise;version='[1.1.1,1.1.2)',\
+ org.osgi.util.pushstream;version='[1.0.1,1.0.2)',\
+ slf4j.api;version='[1.7.30,1.7.31)',\
+ junit-jupiter-api;version='[5.6.2,5.6.3)',\
+ junit-platform-commons;version='[1.6.2,1.6.3)',\
+ net.bytebuddy.byte-buddy;version='[1.10.13,1.10.14)',\
+ net.bytebuddy.byte-buddy-agent;version='[1.10.13,1.10.14)',\
+ org.mockito.mockito-core;version='[3.5.10,3.5.11)',\
+ org.objenesis;version='[3.1.0,3.1.1)',\
+ org.opentest4j;version='[1.2.0,1.2.1)',\
+ org.osgi.test.common;version='[0.9.0,0.9.1)',\
+ org.osgi.test.junit5;version='[0.9.0,0.9.1)',\
+ junit-platform-engine;version='[1.6.2,1.6.3)',\
+ junit-platform-launcher;version='[1.6.2,1.6.3)',\
+ junit-jupiter-engine;version='[5.6.2,5.6.3)',\
+ org.apache.aries.typedevent.remote.api;version='[0.0.1,0.0.2)',\
+ org.apache.aries.typedevent.remote.remoteservices;version='[0.0.1,0.0.2)',\
+ org.apache.aries.typedevent.remote.remoteservices-tests;version='[0.0.1,0.0.2)',\
+ org.apache.aries.typedevent.remote.spi;version='[0.0.1,0.0.2)',\
+ org.apache.aries.typedevent.bus;version='[0.0.1,0.0.2)'
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml
new file mode 100644
index 0000000..a288407
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/pom.xml
@@ -0,0 +1,50 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.typedevent</groupId>
+ <artifactId>typedevent-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.aries.typedevent.remote</groupId>
+ <artifactId>org.apache.aries.typedevent.remote.spi</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.typedevent</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.annotation</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.annotation.bundle</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.converter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.aries.typedevent.remote</groupId>
+ <artifactId>org.apache.aries.typedevent.remote.api</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>biz.aQute.bnd</groupId>
+ <artifactId>bnd-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java
new file mode 100644
index 0000000..ff3a13e
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/LocalEventConsumerManager.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.spi;
+
+import static java.lang.Boolean.TRUE;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
+import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
+
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.typedevent.UntypedEventHandler;
+
+/**
+ * A simple helper class used to manage the registrations of {@link UntypedEventHandler}
+ * services in the local service registry, used to feed events into the remote events
+ * implementation.
+ *
+ * Implementations should extend this type and override the {@link #notifyUntyped(String, Map)} method
+ * to receive events. The set of events received can be altered by calling {@link #updateTargets(Map)}.
+ */
+public abstract class LocalEventConsumerManager implements UntypedEventHandler {
+
+ /**
+ * A service property indicating that the event handler is a proxy created for a remote node and so
+ * should not be considered as a local interest.
+ */
+ public static final String ARIES_LOCAL_EVENT_PROXY = "org.apache.aries.typedevent.remote.spi.local.proxy";
+
+ /**
+ * A filter to exclude local proxy interests from remote nodes
+ */
+ public static final String ARIES_LOCAL_EVENT_PROXY_EXCLUSION_FILTER = "(!(" + ARIES_LOCAL_EVENT_PROXY + "=true))";
+
+ private final Object lock = new Object();
+ private final Map<String, ServiceRegistration<UntypedEventHandler>> listenerRegistrations = new HashMap<>();
+ private final Map<String, String> topicsToFilters = new HashMap<>();
+ private BundleContext ctx;
+
+ /**
+ * Starts this manager, registering any necessary whiteboard services with the
+ * appropriate topic and filters;
+ * @param ctx
+ */
+ public final void start(BundleContext ctx) {
+ synchronized (lock) {
+ this.ctx = ctx;
+ }
+ updateServiceRegistrations();
+ }
+
+ /**
+ * Stops this manager, unregistering any whiteboard services
+ */
+ public final void stop() {
+ synchronized (lock) {
+ this.ctx = null;
+ }
+ Map<String, ServiceRegistration<UntypedEventHandler>> toUnregister;
+ synchronized (lock) {
+ toUnregister = new HashMap<>(listenerRegistrations);
+ listenerRegistrations.clear();
+ }
+ toUnregister.values().stream().forEach(this::safeUnregister);
+ }
+
+
+ private void updateServiceRegistrations() {
+ Map<String, String> possibleUpdates = new HashMap<String, String>();
+ Map<String, ServiceRegistration<UntypedEventHandler>> toUnregister;
+ synchronized (lock) {
+ possibleUpdates = new HashMap<>(topicsToFilters);
+ toUnregister = listenerRegistrations.entrySet().stream()
+ .filter(e -> !topicsToFilters.containsKey(e.getKey()))
+ .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+ listenerRegistrations.keySet().removeAll(toUnregister.keySet());
+ }
+
+ toUnregister.values().stream().forEach(this::safeUnregister);
+
+ for (Entry<String, String> entry : possibleUpdates.entrySet()) {
+
+ String topic = entry.getKey();
+ String filter = entry.getValue();
+
+ ServiceRegistration<UntypedEventHandler> reg;
+ BundleContext ctx;
+ synchronized (lock) {
+ reg = listenerRegistrations.get(topic);
+ ctx = this.ctx;
+ }
+
+ if(reg == null) {
+ if(ctx != null) {
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put(TYPED_EVENT_TOPICS, topic);
+ props.put(ARIES_LOCAL_EVENT_PROXY, TRUE);
+ if(filter != null && !filter.contentEquals("")) {
+ props.put(TYPED_EVENT_FILTER, filter);
+ }
+ reg = ctx.registerService(UntypedEventHandler.class, this, props);
+
+ synchronized (lock) {
+ ServiceRegistration<UntypedEventHandler> oldReg = listenerRegistrations.putIfAbsent(topic, reg);
+ if(oldReg == null) {
+ reg = null;
+ }
+ }
+ if(reg != null) {
+ reg.unregister();
+ }
+ }
+ } else if(ctx != null) {
+
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put(TYPED_EVENT_TOPICS, topic);
+ props.put(ARIES_LOCAL_EVENT_PROXY, TRUE);
+ if(filter != null && !filter.contentEquals("")) {
+ if(filter.equals(reg.getReference().getProperty(TYPED_EVENT_FILTER))) {
+ // Filter unchanged - no need to update
+ continue;
+ }
+ props.put(TYPED_EVENT_FILTER, filter);
+ } else if (reg.getReference().getProperty(TYPED_EVENT_FILTER) == null) {
+ // Filter unchanged - no need to update
+ continue;
+ }
+ reg.setProperties(props);
+ }
+ }
+
+ boolean changed;
+ synchronized (lock) {
+ changed = !possibleUpdates.equals(topicsToFilters);
+ }
+ if(changed) {
+ updateServiceRegistrations();
+ }
+ }
+
+ private void safeUnregister(ServiceRegistration<?> reg) {
+ try {
+ reg.unregister();
+ } catch (IllegalStateException ise) {
+ // Just ignore it
+ }
+ }
+
+ /**
+ * Set the topic and filter targets for which whiteboard listeners
+ * should be registered
+ * @param updated - A Map of topic names (or globs) to filters
+ */
+ protected final void updateTargets(Map<String, String> updated) {
+ synchronized (lock) {
+ topicsToFilters.clear();
+ topicsToFilters.putAll(updated);
+ }
+
+ updateServiceRegistrations();
+ }
+
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java
new file mode 100644
index 0000000..61ef87f
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/RemoteEventMonitorImpl.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aries.typedevent.remote.spi;
+
+import static org.apache.aries.typedevent.remote.api.RemoteEventConstants.REMOTE_EVENT_MARKER;
+
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.aries.typedevent.remote.api.FilterDTO;
+import org.apache.aries.typedevent.remote.api.RemoteEventMonitor;
+import org.apache.aries.typedevent.remote.api.RemoteMonitorEvent;
+import org.apache.aries.typedevent.remote.api.RemoteMonitorEvent.PublishType;
+import org.osgi.framework.Filter;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.service.typedevent.monitor.MonitorEvent;
+import org.osgi.service.typedevent.monitor.TypedEventMonitor;
+import org.osgi.util.converter.Converters;
+import org.osgi.util.function.Predicate;
+import org.osgi.util.pushstream.PushStream;
+
+public class RemoteEventMonitorImpl implements RemoteEventMonitor {
+
+ private final TypedEventMonitor monitor;
+
+ public RemoteEventMonitorImpl(TypedEventMonitor monitor) {
+
+ this.monitor = monitor;
+ }
+
+ private static RemoteMonitorEvent toRemoteEvent(MonitorEvent event) {
+
+ Object remoteMarker = event.eventData.get(REMOTE_EVENT_MARKER);
+
+ RemoteMonitorEvent me = Converters.standardConverter().convert(event).sourceAsDTO().targetAsDTO().to(RemoteMonitorEvent.class);
+ me.publishType = Boolean.valueOf(String.valueOf(remoteMarker)) ? PublishType.REMOTE : PublishType.LOCAL;
+
+ return me;
+ }
+
+ @Override
+ public PushStream<RemoteMonitorEvent> monitorEvents(FilterDTO... filters) {
+ return monitorEvents(0, filters);
+ }
+
+ @Override
+ public PushStream<RemoteMonitorEvent> monitorEvents(int history, FilterDTO...filters) {
+ return monitor.monitorEvents(history)
+ .map(RemoteEventMonitorImpl::toRemoteEvent)
+ .filter(createFilter(filters));
+ }
+
+ @Override
+ public PushStream<RemoteMonitorEvent> monitorEvents(Instant history, FilterDTO...filters) {
+ return monitor.monitorEvents(history)
+ .map(RemoteEventMonitorImpl::toRemoteEvent)
+ .filter(createFilter(filters));
+ }
+
+ private class FilterPair {
+ Filter ldap;
+ Pattern regex;
+
+ FilterPair(FilterDTO filter) {
+ if (filter.ldapExpression != null && !filter.ldapExpression.isEmpty()) {
+ try {
+ ldap = FrameworkUtil.createFilter(filter.ldapExpression);
+ } catch (InvalidSyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ if (filter.regularExpression != null && !filter.regularExpression.isEmpty()) {
+ regex = Pattern.compile(filter.regularExpression);
+ }
+ }
+ }
+
+ private Predicate<RemoteMonitorEvent> createFilter(FilterDTO... filters) {
+ List<FilterPair> filterPairs = Arrays.asList(filters).stream()
+ .map(FilterPair::new).collect(Collectors.toList());
+
+ if (filterPairs.isEmpty()) {
+ return x -> true;
+ }
+
+ return event -> {
+ // We use a TreeMap to ensure predictable ordering of keys
+ // This is important for the regex matching contract.
+
+ SortedMap<String, Object> toFilter = new TreeMap<>();
+
+ // Using a collector blew up with null values, even though they are
+ // supported by the TreeMap
+ event.eventData.entrySet().stream()
+ .flatMap(e -> flatten("", e))
+ .forEach(e -> toFilter.put(e.getKey(), e.getValue()));
+
+ toFilter.put("-topic", event.topic);
+ toFilter.put("-publishType", event.publishType);
+
+ StringBuilder eventText = new StringBuilder();
+
+ if (filterPairs.stream().anyMatch(p -> p.regex != null)) {
+ toFilter.forEach((k, v) -> {
+ eventText.append(k).append(':').append(v).append(',');
+ });
+ }
+
+ // If a FilterDTO contains both LDAP and regular expressions, then both must match.
+ return filterPairs.stream().anyMatch(p ->
+ (p.ldap == null || p.ldap.matches(toFilter)) &&
+ (p.regex == null || p.regex.matcher(eventText).find())
+ );
+ };
+ }
+
+ private Stream<Entry<String, Object>> flatten(String parentScope,
+ Entry<String, Object> entry) {
+
+ if (entry.getValue() instanceof Map) {
+
+ String keyPrefix = parentScope + entry.getKey() + ".";
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> subMap = (Map<String, Object>) entry.getValue();
+
+ // Recursively flatten maps that are inside our map
+ return subMap.entrySet().stream()
+ .flatMap(e -> flatten(keyPrefix, e));
+ } else if(parentScope.isEmpty()) {
+ // Fast path for top-level entries
+ return Stream.of(entry);
+ } else {
+ // Map the key of a nested entry into x.y.z
+ return Stream.of(new AbstractMap.SimpleEntry<>(
+ parentScope + entry.getKey(), entry.getValue()));
+ }
+
+ }
+
+}
diff --git a/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java
new file mode 100644
index 0000000..a08926c
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/org.apache.aries.typedevent.remote.spi/src/main/java/org/apache/aries/typedevent/remote/spi/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@org.osgi.annotation.bundle.Export
+@org.osgi.annotation.versioning.Version("0.0.1")
+package org.apache.aries.typedevent.remote.spi;
\ No newline at end of file
diff --git a/org.apache.aries.typedevent.remote/pom.xml b/org.apache.aries.typedevent.remote/pom.xml
new file mode 100644
index 0000000..79ff373
--- /dev/null
+++ b/org.apache.aries.typedevent.remote/pom.xml
@@ -0,0 +1,21 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.typedevent</groupId>
+ <artifactId>typedevent-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.aries.typedevent.remote</groupId>
+ <artifactId>org.apache.aries.typedevent.remote</artifactId>
+ <packaging>pom</packaging>
+
+
+ <modules>
+ <module>org.apache.aries.typedevent.remote.api</module>
+ <module>org.apache.aries.typedevent.remote.spi</module>
+ <module>org.apache.aries.typedevent.remote.remoteservices</module>
+ </modules>
+</project>
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index dfb1c98..7e89a7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -45,6 +45,18 @@
<enabled>false</enabled>
</releases>
</repository>
+ <repository>
+ <id>Sonatype-snapshots</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ <snapshots>
+ <enabled>true</enabled>
+ <updatePolicy>daily</updatePolicy>
+ <checksumPolicy>ignore</checksumPolicy>
+ </snapshots>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ </repository>
</repositories>
<properties>
@@ -322,5 +334,6 @@
<modules>
<module>typedevent-test-bom</module>
<module>org.apache.aries.typedevent.bus</module>
+ <module>org.apache.aries.typedevent.remote</module>
</modules>
</project>