Add support for discovery using mDNS
diff --git a/discovery/mdns/pom.xml b/discovery/mdns/pom.xml
new file mode 100644
index 0000000..c6fd40d
--- /dev/null
+++ b/discovery/mdns/pom.xml
@@ -0,0 +1,43 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>org.apache.aries.rsa.discovery</artifactId>
+ <version>1.16.1-SNAPSHOT</version>
+ </parent>
+ <artifactId>org.apache.aries.rsa.discovery.mdns</artifactId>
+ <name>Aries Remote Service Admin Discovery Zookeeper</name>
+
+ <properties>
+ <topDirectoryLocation>../..</topDirectoryLocation>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.aries.rsa</groupId>
+ <artifactId>org.apache.aries.rsa.spi</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.component.annotations</artifactId>
+ <version>1.4.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.jaxrs</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.ws.rs</groupId>
+ <artifactId>jakarta.ws.rs-api</artifactId>
+ <version>2.1.6</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jmdns</groupId>
+ <artifactId>jmdns</artifactId>
+ <version>3.5.7</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java
new file mode 100644
index 0000000..ca97f28
--- /dev/null
+++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/Interest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.mdns;
+
+import static org.osgi.service.remoteserviceadmin.EndpointEvent.ADDED;
+import static org.osgi.service.remoteserviceadmin.EndpointEvent.MODIFIED;
+import static org.osgi.service.remoteserviceadmin.EndpointEvent.MODIFIED_ENDMATCH;
+import static org.osgi.service.remoteserviceadmin.EndpointEvent.REMOVED;
+import static org.osgi.service.remoteserviceadmin.EndpointEventListener.ENDPOINT_LISTENER_SCOPE;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.aries.rsa.util.StringPlus;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("deprecation")
+public class Interest {
+ private static final Logger LOG = LoggerFactory.getLogger(Interest.class);
+
+ private final Long id;
+ private final ConcurrentMap<String, EndpointDescription> added = new ConcurrentHashMap<>();
+ private final AtomicReference<List<String>> scopes = new AtomicReference<>();
+ private final Object epListener;
+
+
+ public Interest(Long id, Object epListener, Map<String, Object> props) {
+ this.id = id;
+ this.scopes.set(StringPlus.normalize(props.get(ENDPOINT_LISTENER_SCOPE)));
+ this.epListener = epListener;
+ }
+
+ public void update(Map<String, Object> props) {
+
+ List<String> newScopes = StringPlus.normalize(props.get(ENDPOINT_LISTENER_SCOPE));
+ List<String> oldScopes = this.scopes.getAndSet(newScopes);
+
+ added.values().removeIf(ed -> {
+ Optional<String> newScope = getFirstMatch(ed, newScopes);
+ Optional<String> oldScope = getFirstMatch(ed, oldScopes);
+ EndpointEvent event;
+ boolean remove;
+ String filter;
+ if(newScope.isPresent()) {
+ remove = false;
+ filter = newScope.get();
+ if(oldScope.isPresent() && oldScope.get().equals(filter)) {
+ event = null;
+ } else {
+ event = new EndpointEvent(MODIFIED, ed);
+ }
+ } else {
+ remove = true;
+ event = new EndpointEvent(REMOVED, ed);
+ filter = oldScope.orElse(null);
+ }
+
+ notifyListener(event, filter);
+
+ return remove;
+ });
+ }
+
+ public Object getEpListener() {
+ return epListener;
+ }
+
+ public void endpointChanged(EndpointDescription ed) {
+ List<String> scopes = this.scopes.get();
+ Optional<String> currentScope = getFirstMatch(ed, scopes);
+ boolean alreadyAdded = added.containsKey(ed.getId());
+ EndpointEvent event;
+ String filter;
+ if (currentScope.isPresent()) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Listener {} is interested in endpoint {}. It will be {}", id, ed, alreadyAdded ? "MODIFIED" : "ADDED");
+ }
+ added.put(ed.getId(), ed);
+ event = new EndpointEvent(alreadyAdded ? MODIFIED : ADDED, ed);
+ filter = currentScope.get();
+ } else if(alreadyAdded) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Listener {} is no longer interested in endpoint {}. It will be {}", id, ed, alreadyAdded ? "MODIFIED" : "ADDED");
+ }
+ EndpointDescription previous = added.remove(ed.getId());
+ event = new EndpointEvent(MODIFIED_ENDMATCH, ed);
+ filter = getFirstMatch(previous, scopes).orElse(null);
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Listener {} not interested in endpoint {}", id, ed);
+ }
+ return;
+ }
+
+ notifyListener(event, filter);
+ }
+
+ public void endpointRemoved(String id) {
+ EndpointDescription previous = added.remove(id);
+ if(previous != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Endpoint {} is no longer available for listener {}", id, this.id);
+ }
+ notifyListener(new EndpointEvent(REMOVED, previous), getFirstMatch(previous, scopes.get()).orElse(null));
+ }
+ }
+
+ private void notifyListener(EndpointEvent event, String filter) {
+ if (epListener instanceof EndpointEventListener) {
+ notifyEEListener(event, filter, (EndpointEventListener)epListener);
+ } else if (epListener instanceof EndpointListener) {
+ notifyEListener(event, filter, (EndpointListener)epListener);
+ }
+ }
+
+ private Optional<String> getFirstMatch(EndpointDescription endpoint, List<String> scopes) {
+ return scopes.stream().filter(endpoint::matches).findFirst();
+ }
+
+ private void notifyEEListener(EndpointEvent event, String currentScope, EndpointEventListener listener) {
+ EndpointDescription endpoint = event.getEndpoint();
+ LOG.info("Calling endpointchanged on class {} for filter {}, type {}, endpoint {} ",
+ listener, currentScope, event.getType(), endpoint);
+ listener.endpointChanged(event, currentScope);
+ }
+
+ private void notifyEListener(EndpointEvent event, String currentScope, EndpointListener listener) {
+ EndpointDescription endpoint = event.getEndpoint();
+ LOG.info("Calling old listener on class {} for filter {}, type {}, endpoint {} ",
+ listener, currentScope, event.getType(), endpoint);
+ switch (event.getType()) {
+ case EndpointEvent.ADDED:
+ listener.endpointAdded(endpoint, currentScope);
+ break;
+
+ case EndpointEvent.MODIFIED:
+ listener.endpointRemoved(endpoint, currentScope);
+ listener.endpointAdded(endpoint, currentScope);
+ break;
+
+ case EndpointEvent.REMOVED:
+ listener.endpointRemoved(endpoint, currentScope);
+ break;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Interest [scopes=" + scopes + ", epListener=" + epListener.getClass() + "]";
+ }
+
+}
diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java
new file mode 100644
index 0000000..42f215d
--- /dev/null
+++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/InterestManager.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.mdns;
+
+import static java.util.Collections.singleton;
+import static java.util.stream.Collectors.toSet;
+import static java.util.stream.Stream.concat;
+import static org.apache.aries.rsa.discovery.mdns.PublishingEndpointListener.Subscription.ENDPOINT_REVOKED;
+import static org.apache.aries.rsa.discovery.mdns.PublishingEndpointListener.Subscription.ENDPOINT_UPDATED;
+
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.sse.InboundSseEvent;
+import javax.ws.rs.sse.SseEventSource;
+
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.osgi.service.jaxrs.client.SseEventSourceFactory;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the {@link EndpointEventListener}s and the scopes they are interested in.
+ * Establishes SSE event sources to be called back on all changes in the remote targets.
+ * Events are then forwarded to all interested {@link EndpointEventListener}s.
+ */
+@SuppressWarnings("deprecation")
+public class InterestManager {
+ private static final Logger LOG = LoggerFactory.getLogger(InterestManager.class);
+
+ private final ConcurrentMap<Long, Interest> interests = new ConcurrentHashMap<>();
+
+ private final SseEventSourceFactory eventSourceFactory;
+
+ private final EndpointDescriptionParser parser;
+
+ private final Client client;
+
+ private final ConcurrentMap<String, Set<EndpointDescription>> endpointsBySource = new ConcurrentHashMap<>();
+
+ private final ConcurrentMap<String, SseEventSource> streams = new ConcurrentHashMap<>();
+
+ public InterestManager(SseEventSourceFactory factory, EndpointDescriptionParser parser, Client client) {
+
+ this.eventSourceFactory = factory;
+ this.parser = parser;
+ this.client = client;
+
+ }
+
+ public void deactivate() {
+
+ streams.values().forEach(SseEventSource::close);
+ streams.clear();
+
+ interests.clear();
+ }
+
+ public void remoteAdded(String uri) {
+ if(streams.containsKey(uri)) {
+ return;
+ }
+
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Discovered a remote at {}", uri);
+ }
+
+ SseEventSource sse = eventSourceFactory.newBuilder(client.target(uri)).build();
+ sse.register(i -> onEndpointEvent(uri, i), t -> lostRemoteStream(uri, t), () -> lostRemoteStream(uri, null));
+ streams.put(uri, sse);
+ sse.open();
+ }
+
+ public void remoteRemoved(String uri) {
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Remote at {} is no longer present", uri);
+ }
+
+ SseEventSource sseEventSource = streams.remove(uri);
+ if(sseEventSource != null) {
+ sseEventSource.close();
+ }
+ }
+
+ private void onEndpointEvent(String source, InboundSseEvent event) {
+ String name = event.getName();
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Received a {} notification from {}", name, source);
+ }
+
+ if(ENDPOINT_UPDATED.equals(name)) {
+ EndpointDescription ed = parser.readEndpoint(event.readData(InputStream.class));
+ endpointsBySource.compute(source, (a,b) -> {
+ return b == null ? singleton(ed) : concat(b.stream(), Stream.of(ed)).collect(toSet());
+ });
+ interests.values().forEach(i -> i.endpointChanged(ed));
+ } else if (ENDPOINT_REVOKED.equals(name)) {
+ String id = event.readData();
+ endpointsBySource.compute(source, (a,b) -> {
+ if(b == null) {
+ return null;
+ } else {
+ Set<EndpointDescription> set = b.stream().filter(ed -> !ed.getId().equals(id)).collect(toSet());
+ return set.isEmpty() ? null : set;
+ }
+ });
+ interests.values().forEach(i -> i.endpointRemoved(id));
+ }
+ }
+
+ private void lostRemoteStream(String source, Throwable t) {
+
+ if(t != null) {
+ if(LOG.isWarnEnabled()) {
+ LOG.warn("The remote {} had a failure", source, t);
+ }
+ } else {
+ if(LOG.isInfoEnabled()) {
+ LOG.info("The remote {} has disconnected", source);
+ }
+ }
+
+ Set<EndpointDescription> remove = endpointsBySource.remove(source);
+ if(remove != null) {
+ remove.forEach(ed -> interests.values().forEach(i -> i.endpointRemoved(ed.getId())));
+ }
+ }
+
+ public void bindEndpointEventListener(EndpointEventListener epListener, Map<String, Object> props) {
+ addInterest(epListener, props);
+ }
+
+ public void updatedEndpointEventListener(Map<String, Object> props) {
+ updatedInterest(props);
+ }
+
+ public void unbindEndpointEventListener(Map<String, Object> props) {
+ interests.remove(getServiceId(props));
+ }
+
+ public void bindEndpointListener(EndpointListener epListener, Map<String, Object> props) {
+ addInterest(epListener, props);
+ }
+
+ public void updatedEndpointListener(Map<String, Object> props) {
+ updatedInterest(props);
+ }
+
+ public void unbindEndpointListener(Map<String, Object> props) {
+ interests.remove(getServiceId(props));
+ }
+
+ private Long getServiceId(Map<String, Object> props) {
+ return (Long) props.get("service.id");
+ }
+
+ private void addInterest(Object epListener, Map<String, Object> props) {
+
+ Long id = getServiceId(props);
+
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Service {} has registered an interest in endpoint events", id);
+ }
+
+ Interest interest = new Interest(getServiceId(props), epListener, props);
+
+ interests.put(getServiceId(props), interest);
+ endpointsBySource.values().stream()
+ .flatMap(Set::stream)
+ .forEach(interest::endpointChanged);
+ }
+
+ private void updatedInterest(Map<String, Object> props) {
+
+ Long id = getServiceId(props);
+
+ if(LOG.isInfoEnabled()) {
+ LOG.info("Service {} has changed its interest in endpoint events", id);
+ }
+
+ interests.get(id).update(props);
+ }
+}
diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java
new file mode 100644
index 0000000..a217862
--- /dev/null
+++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/MdnsDiscovery.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.mdns;
+
+import static org.osgi.framework.Constants.FRAMEWORK_UUID;
+import static org.osgi.service.jaxrs.runtime.JaxrsServiceRuntimeConstants.JAX_RS_SERVICE_ENDPOINT;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.jmdns.JmDNS;
+import javax.jmdns.ServiceEvent;
+import javax.jmdns.ServiceInfo;
+import javax.jmdns.ServiceListener;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.apache.aries.rsa.util.StringPlus;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
+import org.osgi.service.jaxrs.client.SseEventSourceFactory;
+import org.osgi.service.jaxrs.runtime.JaxrsServiceRuntime;
+import org.osgi.service.jaxrs.runtime.dto.RuntimeDTO;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("deprecation")
+@org.apache.aries.rsa.annotations.RSADiscoveryProvider(protocols = "aries.mdns")
+@Component
+public class MdnsDiscovery {
+
+ private static final String _ARIES_DISCOVERY_HTTP_TCP_LOCAL = "_aries-discovery._tcp.local.";
+
+ private static final Logger LOG = LoggerFactory.getLogger(MdnsDiscovery.class);
+
+ private final Client client;
+
+ private final String fwUuid;
+
+ private final InterestManager interestManager;
+
+ private final PublishingEndpointListener publishingListener;
+
+ private JaxrsServiceRuntime runtime;
+
+ private JmDNS jmdns;
+
+
+ @Activate
+ public MdnsDiscovery(BundleContext ctx, @Reference SseEventSourceFactory eventSourceFactory,
+ @Reference ClientBuilder clientBuilder, @Reference EndpointDescriptionParser parser) {
+ this.client = clientBuilder.build();
+ this.interestManager = new InterestManager(eventSourceFactory, parser, client);
+ fwUuid = ctx.getProperty(FRAMEWORK_UUID);
+ this.publishingListener = new PublishingEndpointListener(parser, ctx, fwUuid);
+ }
+
+ @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
+ public void bindEndpointEventListener(EndpointEventListener epListener, Map<String, Object> props) {
+ interestManager.bindEndpointEventListener(epListener, props);
+ }
+
+ public void updatedEndpointEventListener(Map<String, Object> props) {
+ interestManager.updatedEndpointEventListener(props);
+ }
+
+ public void unbindEndpointEventListener(Map<String, Object> props) {
+ interestManager.unbindEndpointEventListener(props);
+ }
+
+ @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
+ public void bindEndpointListener(EndpointListener epListener, Map<String, Object> props) {
+ interestManager.bindEndpointListener(epListener, props);
+ }
+
+ public void updatedEndpointListener(Map<String, Object> props) {
+ interestManager.updatedEndpointListener(props);
+ }
+
+ public void unbindEndpointListener(Map<String, Object> props) {
+ interestManager.unbindEndpointListener(props);
+ }
+
+ @Reference(policy = ReferencePolicy.DYNAMIC)
+ public void bindJaxrsServiceRuntime(JaxrsServiceRuntime runtime) {
+ updateAndRegister(runtime);
+ }
+
+ public void updatedJaxrsServiceRuntime(JaxrsServiceRuntime runtime) {
+ updateAndRegister(runtime);
+ }
+
+ public void unbindJaxrsServiceRuntime(JaxrsServiceRuntime runtime) {
+ JmDNS jmdns = null;
+ synchronized (this) {
+ if(runtime == this.runtime) {
+ jmdns = this.jmdns;
+ this.runtime = null;
+ }
+ }
+
+ if(jmdns != null) {
+ jmdns.unregisterAllServices();
+ }
+ }
+
+ private void updateAndRegister(JaxrsServiceRuntime runtime) {
+ JmDNS jmdns;
+ synchronized (this) {
+ this.runtime = runtime;
+ jmdns = this.jmdns;
+ }
+
+ if(jmdns != null) {
+ RuntimeDTO runtimeDTO = runtime.getRuntimeDTO();
+ List<String> uris = StringPlus.normalize(runtimeDTO.serviceDTO.properties.get(JAX_RS_SERVICE_ENDPOINT));
+
+ if(uris == null || uris.isEmpty()) {
+ LOG.warn("Unable to advertise discovery as there are no endpoint URIs");
+ return;
+ }
+
+ String base = runtimeDTO.defaultApplication.base;
+ if(base == null) {
+ base = "";
+ }
+
+ base += "/aries/rsa/discovery";
+
+ URI uri = uris.stream()
+ .filter(s -> s.matches(".*(?:[0-9]{1,3}\\.){3}[0-9]{1,3}.*"))
+ .findFirst()
+ .map(URI::create)
+ .orElseGet(() -> URI.create(uris.get(0)));
+
+ Map<String, Object> props = new HashMap<>();
+ props.put("scheme", uri.getScheme() == null ? "" : uri.getScheme());
+ props.put("path", uri.getPath() == null ? base : uri.getPath() + base);
+ props.put("frameworkUuid", fwUuid);
+
+ ServiceInfo info = ServiceInfo.create(_ARIES_DISCOVERY_HTTP_TCP_LOCAL, fwUuid, uri.getPort(), 0, 0, props);
+
+ try {
+ jmdns.registerService(info);
+ } catch (IOException ioe) {
+ LOG.error("Unable to advertise discovery", ioe);
+ }
+ }
+ }
+
+ public static @interface Config {
+ public String bind_address();
+ }
+
+ @Activate
+ public void start(Config config) throws UnknownHostException, IOException {
+
+ String bind = config.bind_address();
+
+ JmDNS jmdns = JmDNS.create(bind == null ? null : InetAddress.getByName(bind));
+
+ JaxrsServiceRuntime runtime;
+ synchronized (this) {
+ this.jmdns = jmdns;
+ runtime = this.runtime;
+ }
+
+ if(runtime != null) {
+ updateAndRegister(runtime);
+ }
+
+ // Add a service listener
+ jmdns.addServiceListener(_ARIES_DISCOVERY_HTTP_TCP_LOCAL, new MdnsListener());
+
+ }
+
+ @Deactivate
+ public void stop () {
+ try {
+ jmdns.close();
+ } catch (IOException e) {
+ LOG.warn("An exception occurred closing the mdns discovery");
+ }
+
+ interestManager.deactivate();
+ publishingListener.stop();
+ }
+
+ private class MdnsListener implements ServiceListener {
+
+ private final ConcurrentMap<String, String> namesToUris = new ConcurrentHashMap<>();
+
+ @Override
+ public void serviceAdded(ServiceEvent event) {
+ }
+
+ @Override
+ public void serviceRemoved(ServiceEvent event) {
+ ServiceInfo info = event.getInfo();
+ if(info != null) {
+ String removed = namesToUris.remove(info.getKey());
+ if(removed != null) {
+ interestManager.remoteRemoved(removed);
+ }
+ }
+ }
+
+ @Override
+ public void serviceResolved(ServiceEvent event) {
+ ServiceInfo info = event.getInfo();
+
+ String infoUuid = info.getPropertyString("frameworkUuid");
+
+ if(infoUuid == null || infoUuid.equals(fwUuid)) {
+ // Ignore until we can see if this is for our own endpoint
+ return;
+ }
+
+ String scheme = info.getPropertyString("scheme");
+ if(scheme == null) {
+ scheme = "http";
+ }
+
+ String path = info.getPropertyString("path");
+ if(path == null) {
+ // Not a complete record yet
+ return;
+ }
+
+ int port = info.getPort();
+ if(port == -1) {
+ switch(scheme) {
+ case "http":
+ port = 80;
+ break;
+ case "https":
+ port = 443;
+ break;
+ default:
+ LOG.error("Unknown URI scheme advertised {} by framework {} on host {}",
+ scheme, info.getName(), info.getDomain());
+ }
+ }
+
+ String address = info.getInetAddresses()[0].getHostAddress();
+
+ String uri = String.format("%s://%s:%d/%s", scheme, address, port, path);
+
+ LOG.info("Discovered remote at {}", uri);
+
+ namesToUris.put(info.getKey(), uri);
+
+ interestManager.remoteAdded(uri);
+ }
+ }
+}
diff --git a/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java
new file mode 100644
index 0000000..d5f4b70
--- /dev/null
+++ b/discovery/mdns/src/main/java/org/apache/aries/rsa/discovery/mdns/PublishingEndpointListener.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.mdns;
+
+import static java.util.Collections.singleton;
+import static java.util.stream.Collectors.toSet;
+import static java.util.stream.Stream.concat;
+import static javax.ws.rs.core.MediaType.SERVER_SENT_EVENTS;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Stream;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.sse.Sse;
+import javax.ws.rs.sse.SseEventSink;
+
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceFactory;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.jaxrs.whiteboard.annotations.RequireJaxrsWhiteboard;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Listens for local {@link EndpointEvent}s using {@link EndpointEventListener} and old style {@link EndpointListener}
+ * and publishes changes to listeners using Server Sent Events (SSE)
+ */
+@SuppressWarnings("deprecation")
+@RequireJaxrsWhiteboard
+public class PublishingEndpointListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MdnsDiscovery.class);
+
+ private final String uuid;
+
+ private final EndpointDescriptionParser parser;
+
+ private final ServiceRegistration<?> listenerReg;
+ private final ServiceRegistration<?> resourceReg;
+
+ private final ConcurrentMap<String, SponsoredEndpoint> localEndpoints = new ConcurrentHashMap<>();
+
+ private final Set<Subscription> listeners = ConcurrentHashMap.newKeySet();
+
+ @SuppressWarnings("serial")
+ public PublishingEndpointListener(EndpointDescriptionParser parser, BundleContext bctx, String uuid) {
+ this.parser = parser;
+ this.uuid = uuid;
+ String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+ Dictionary<String, Object> props = serviceProperties(uuid);
+ listenerReg = bctx.registerService(ifAr, new ListenerFactory(), props);
+ resourceReg = bctx.registerService(PublishingEndpointListener.class, this,
+ new Hashtable<String, Object>() {{put("osgi.jaxrs.resource", Boolean.TRUE);}});
+ }
+
+ @Deactivate
+ public void stop() {
+ listenerReg.unregister();
+ listeners.forEach(Subscription::close);
+ resourceReg.unregister();
+ }
+
+ private void endpointUpdate(Long bundleId, EndpointDescription ed, int type) {
+ String edFwUuid = ed.getFrameworkUUID();
+ if(edFwUuid == null || !edFwUuid.equals(uuid)) {
+ LOG.warn("This listener has been called with an endpoint {} for a remote framework {}", ed.getId(), edFwUuid);
+ return;
+ }
+ String id = ed.getId();
+ switch(type) {
+ case EndpointEvent.ADDED:
+ case EndpointEvent.MODIFIED:
+ localEndpoints.compute(id, (k,v) -> {
+ return v == null ? new SponsoredEndpoint(ed, singleton(bundleId)) :
+ new SponsoredEndpoint(ed, concat(v.sponsors.stream(), Stream.of(bundleId)).collect(toSet()));
+ });
+ String data = toEndpointData(ed);
+ listeners.forEach(s -> s.update(data));
+ break;
+ case EndpointEvent.MODIFIED_ENDMATCH:
+ case EndpointEvent.REMOVED:
+ boolean act = localEndpoints.compute(id, (k,v) -> {
+ if(v == null) {
+ return null;
+ } else {
+ Set<Long> updated = v.sponsors.stream().filter(l -> !bundleId.equals(l)).collect(toSet());
+ return updated.isEmpty() ? null : new SponsoredEndpoint(v.ed, updated);
+ }
+ }) == null;
+
+ if(act) {
+ listeners.forEach(s -> s.revoke(id));
+ }
+ break;
+ default:
+ LOG.error("Unknown event type {} for endpoint {}", type, ed);
+ }
+ }
+
+ private Dictionary<String, Object> serviceProperties(String uuid) {
+ String scope = String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS,
+ RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid);
+ Dictionary<String, Object> props = new Hashtable<>();
+ props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, scope);
+ return props;
+ }
+
+ private String toEndpointData(EndpointDescription ed) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ parser.writeEndpoint(ed, baos);
+ return new String(baos.toByteArray(), StandardCharsets.UTF_8).replace("\n", "").replace("\r", "");
+ } catch (Exception e) {
+ LOG.error("Unable to serialize the endpoint {}", ed, e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @GET
+ @Produces(SERVER_SENT_EVENTS)
+ @Path("aries/rsa/discovery")
+ public void listen(@Context Sse sse, @Context SseEventSink sink) {
+ Subscription subscription = new Subscription(sse, sink);
+ listeners.add(subscription);
+
+ localEndpoints.values().stream()
+ .map(s -> toEndpointData(s.ed))
+ .forEach(subscription::update);
+ }
+
+ private class ListenerFactory implements ServiceFactory<PerClientEndpointEventListener> {
+
+ @Override
+ public PerClientEndpointEventListener getService(Bundle bundle,
+ ServiceRegistration<PerClientEndpointEventListener> registration) {
+ return new PerClientEndpointEventListener(bundle.getBundleId());
+ }
+
+ @Override
+ public void ungetService(Bundle bundle, ServiceRegistration<PerClientEndpointEventListener> registration,
+ PerClientEndpointEventListener service) {
+ Long bundleId = service.bundleId;
+ localEndpoints.values().stream()
+ .filter(s -> s.sponsors.contains(bundleId))
+ .forEach(s -> endpointUpdate(bundleId, s.ed, EndpointEvent.REMOVED));
+ }
+
+ }
+
+ private class PerClientEndpointEventListener implements EndpointEventListener, EndpointListener {
+
+ private final Long bundleId;
+
+ public PerClientEndpointEventListener(Long bundleId) {
+ super();
+ this.bundleId = bundleId;
+ }
+
+ @Override
+ public void endpointChanged(EndpointEvent event, String filter) {
+ endpointUpdate(bundleId, event.getEndpoint(), event.getType());
+ }
+
+ @Override
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ endpointUpdate(bundleId, endpoint, EndpointEvent.ADDED);
+ }
+
+ @Override
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ endpointUpdate(bundleId, endpoint, EndpointEvent.REMOVED);
+ }
+ }
+
+ class Subscription {
+
+ static final String ENDPOINT_UPDATED = "UPDATED";
+ static final String ENDPOINT_REVOKED = "REVOKED";
+
+ Sse sse;
+ SseEventSink eventSink;
+
+ public Subscription(Sse sse, SseEventSink eventSink) {
+ this.sse = sse;
+ this.eventSink = eventSink;
+ }
+
+ public void update(String endpointData) {
+ eventSink.send(sse.newEvent(ENDPOINT_UPDATED, endpointData))
+ .whenComplete(this::sendFailure);
+ }
+
+ public void revoke(String endpointId) {
+ eventSink.send(sse.newEvent(ENDPOINT_REVOKED, endpointId))
+ .whenComplete(this::sendFailure);
+ }
+
+ public void close() {
+ eventSink.close();
+ listeners.remove(this);
+ }
+
+ private void sendFailure(Object o, Throwable t) {
+ if(t != null) {
+ LOG.error("Failed to send endpoint message, closing");
+ listeners.remove(this);
+ eventSink.close();
+ }
+ }
+ }
+
+ private static class SponsoredEndpoint {
+ private final EndpointDescription ed;
+ private final Set<Long> sponsors;
+
+ public SponsoredEndpoint(EndpointDescription ed, Set<Long> sponsors) {
+ super();
+ this.ed = ed;
+ this.sponsors = sponsors;
+ }
+ }
+}
diff --git a/discovery/pom.xml b/discovery/pom.xml
index 82b8f87..496ed22 100644
--- a/discovery/pom.xml
+++ b/discovery/pom.xml
@@ -36,5 +36,6 @@
<module>zookeeper</module>
<module>config</module>
<module>command</module>
+ <module>mdns</module>
</modules>
</project>