ARIES-1780 - Redesign of zookeeper discovery using DS
diff --git a/.gitignore b/.gitignore
index ca2256e..9711ac3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,4 +11,5 @@
 *.md.html
 .idea/
 generated/
+.vscode/
 
diff --git a/discovery/local/bnd.bnd b/discovery/local/bnd.bnd
index 203f6a1..d20f035 100644
--- a/discovery/local/bnd.bnd
+++ b/discovery/local/bnd.bnd
@@ -15,13 +15,12 @@
 #    KIND, either express or implied.  See the License for the
 #    specific language governing permissions and limitations
 #    under the License.
-Bundle-Activator: org.apache.aries.rsa.discovery.local.Activator
 Private-Package: org.apache.aries.rsa.discovery.local
 Export-Package: \
-	org.apache.aries.rsa.discovery.endpoint,\
 	org.osgi.xmlns.rsa.v1_0
 Provide-Capability: osgi.extender;osgi.extender="osgi.remoteserviceadmin";\
 	version:Version="1.1.0";\
 	uses:="org.osgi.service.remoteserviceadmin",\
 	osgi.remoteserviceadmin.discovery;\
 	protocols:List<String>="local"; version:Version=1.1.0
+-privatepackage: org.apache.aries.rsa.discovery.endpoint
\ No newline at end of file
diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParser.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserImpl.java
similarity index 94%
rename from discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParser.java
rename to discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserImpl.java
index 9fcb8b4..5acd583 100644
--- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParser.java
+++ b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserImpl.java
@@ -33,16 +33,19 @@
 import javax.xml.transform.Source;
 import javax.xml.transform.stream.StreamSource;
 
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.osgi.service.component.annotations.Component;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
 import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionsType;
 import org.osgi.xmlns.rsa.v1_0.ObjectFactory;
 import org.osgi.xmlns.rsa.v1_0.PropertyType;
 
-public class EndpointDescriptionParser {
+@Component
+public class EndpointDescriptionParserImpl implements EndpointDescriptionParser {
     private JAXBContext jaxbContext;
 
-    public EndpointDescriptionParser() {
+    public EndpointDescriptionParserImpl() {
         try {
             jaxbContext = JAXBContext.newInstance(EndpointDescriptionsType.class);
         } catch (JAXBException e) {
diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/Activator.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/Activator.java
deleted file mode 100644
index 3e9a187..0000000
--- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/Activator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.local;
-
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.util.tracker.ServiceTracker;
-
-public class Activator implements BundleActivator {
-    private ServiceTracker<EndpointEventListener, EndpointEventListener> listenerTracker;
-    private LocalDiscovery localDiscovery;
-
-    public void start(BundleContext context) {
-        localDiscovery = new LocalDiscovery();
-        listenerTracker = new EPListenerTracker(context, localDiscovery);
-        listenerTracker.open();
-        localDiscovery.processExistingBundles(context.getBundles());
-        context.addBundleListener(localDiscovery);
-    }
-
-    public void stop(BundleContext context) {
-        listenerTracker.close();
-        context.removeBundleListener(localDiscovery);
-    }
-
-    private final class EPListenerTracker extends ServiceTracker<EndpointEventListener, EndpointEventListener> {
-        private final LocalDiscovery localDiscovery;
-    
-        private EPListenerTracker(BundleContext context, LocalDiscovery localDiscovery) {
-            super(context, EndpointEventListener.class, null);
-            this.localDiscovery = localDiscovery;
-        }
-    
-        @Override
-        public EndpointEventListener addingService(ServiceReference<EndpointEventListener> reference) {
-            EndpointEventListener service = super.addingService(reference);
-            localDiscovery.addListener(reference, service);
-            return service;
-        }
-    
-        @Override
-        public void modifiedService(ServiceReference<EndpointEventListener> reference, EndpointEventListener service) {
-            super.modifiedService(reference, service);
-            localDiscovery.removeListener(service);
-    
-            // This may cause duplicate registrations of remote services,
-            // but that's fine and should be filtered out on another level.
-            // See Remote Service Admin spec section 122.6.3
-            localDiscovery.addListener(reference, service);
-        }
-    
-        @Override
-        public void removedService(ServiceReference<EndpointEventListener> reference, EndpointEventListener service) {
-            super.removedService(reference, service);
-            localDiscovery.removeListener(service);
-        }
-    }
-
-    
-}
diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/EndpointDescriptionBundleParser.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/EndpointDescriptionBundleParser.java
index 1484dcb..1929b6c 100644
--- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/EndpointDescriptionBundleParser.java
+++ b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/EndpointDescriptionBundleParser.java
@@ -25,7 +25,7 @@
 import java.util.Enumeration;
 import java.util.List;
 
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParserImpl;
 import org.osgi.framework.Bundle;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.slf4j.Logger;
@@ -37,10 +37,10 @@
     private static final String REMOTE_SERVICES_HEADER_NAME = "Remote-Service";
     private static final String REMOTE_SERVICES_DIRECTORY = "OSGI-INF/remote-service/";
 
-    private EndpointDescriptionParser parser;
+    private EndpointDescriptionParserImpl parser;
 
     public EndpointDescriptionBundleParser() {
-        parser = new EndpointDescriptionParser();
+        parser = new EndpointDescriptionParserImpl();
     }
 
     public List<EndpointDescription> getAllEndpointDescriptions(Bundle b) {
@@ -57,7 +57,7 @@
         return elements;
     }
     
-    Enumeration<URL> getEndpointDescriptionURLs(Bundle b) {
+    private Enumeration<URL> getEndpointDescriptionURLs(Bundle b) {
         String origDir = getRemoteServicesDir(b);
         
         // Split origDir into dir and file pattern
diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java
index adf98b4..22b1f2d 100644
--- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java
+++ b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java
@@ -31,15 +31,21 @@
 
 import org.apache.aries.rsa.util.StringPlus;
 import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleEvent;
 import org.osgi.framework.BundleListener;
 import org.osgi.framework.Filter;
 import org.osgi.framework.FrameworkUtil;
 import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 
+@Component(immediate = true)
 public class LocalDiscovery implements BundleListener {
 
     // this is effectively a set which allows for multiple service descriptions with the
@@ -53,6 +59,18 @@
     public LocalDiscovery() {
         this.bundleParser = new EndpointDescriptionBundleParser();
     }
+    
+    @Activate
+    public void activate(BundleContext context) {
+        Bundle[] bundles = context.getBundles();
+        processExistingBundles(bundles);
+        context.addBundleListener(this);
+    }
+    
+    @Deactivate
+    public void deactivate(BundleContext context) {
+        context.removeBundleListener(this);
+    }
 
     public void processExistingBundles(Bundle[] bundles) {
         if (bundles == null) {
@@ -66,7 +84,8 @@
         }
     }
 
-    void addListener(ServiceReference<EndpointEventListener> endpointListenerRef, EndpointEventListener endpointListener) {
+    @Reference
+    void bindListener(ServiceReference<EndpointEventListener> endpointListenerRef, EndpointEventListener endpointListener) {
         List<String> filters = StringPlus.normalize(endpointListenerRef.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
         if (filters.isEmpty()) {
             return;
@@ -93,7 +112,7 @@
      * itself to clean up any orphans. See Remote Service Admin spec 122.6.3
      * @param endpointListener
      */
-    void removeListener(EndpointEventListener endpointListener) {
+    void unbindListener(EndpointEventListener endpointListener) {
         synchronized (listenerToFilters) {
             Collection<String> filters = listenerToFilters.remove(endpointListener);
             if (filters == null) {
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserTest.java b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserTest.java
index 12ead94..9dcefc0 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserTest.java
+++ b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserTest.java
@@ -31,7 +31,7 @@
     @Test
     public void testEndpointDescriptionsFromURL() throws IOException {
         URL ed1URL = getClass().getResource("/ed1.xml");
-        List<EndpointDescription> edElements = new EndpointDescriptionParser().
+        List<EndpointDescription> edElements = new EndpointDescriptionParserImpl().
             readEndpoints(ed1URL.openStream());
         Assert.assertEquals(4, edElements.size());
     }
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/PropertiesMapperTest.java b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/PropertiesMapperTest.java
index ffd1dbc..31fa1b7 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/PropertiesMapperTest.java
+++ b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/PropertiesMapperTest.java
@@ -80,7 +80,7 @@
 
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         EndpointDescription epd = new EndpointDescription(m);
-        new EndpointDescriptionParser().writeEndpoint(epd, bos);
+        new EndpointDescriptionParserImpl().writeEndpoint(epd, bos);
         byte[] epData = bos.toByteArray();
         System.out.println(new String(epData));
         URL edURL = getClass().getResource("/ed2-generated.xml");
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/LocalDiscoveryTest.java b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/LocalDiscoveryTest.java
index 59b3c3f..20afdf6 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/LocalDiscoveryTest.java
+++ b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/LocalDiscoveryTest.java
@@ -103,7 +103,7 @@
         endpointListener.endpointChanged(EasyMock.anyObject(EndpointEvent.class), EasyMock.eq("(objectClass=*)"));
         EasyMock.expectLastCall();
         EasyMock.replay(endpointListener);
-        ld.addListener(sr, endpointListener);
+        ld.bindListener(sr, endpointListener);
 
         // Start the bundle
         BundleEvent be = new BundleEvent(BundleEvent.STARTED, bundle);
@@ -148,7 +148,7 @@
         // Add the EndpointListener Service
         assertEquals("Precondition failed", 0, ld.listenerToFilters.size());
         assertEquals("Precondition failed", 0, ld.filterToListeners.size());
-        ld.addListener(sr, el);
+        ld.bindListener(sr, el);
 
         assertEquals(1, ld.listenerToFilters.size());
         assertEquals(Collections.singletonList("(objectClass=org.example.ClassA)"), ld.listenerToFilters.get(el));
@@ -175,8 +175,8 @@
         }).times(2);
         EasyMock.replay(el);
 
-        ld.removeListener(el);
-        ld.addListener(sr2, el);
+        ld.unbindListener(el);
+        ld.bindListener(sr2, el);
         assertEquals(1, ld.listenerToFilters.size());
         assertEquals(Arrays.asList("(|(objectClass=org.example.ClassA)(objectClass=org.example.ClassB))"),
             ld.listenerToFilters.get(el));
@@ -189,7 +189,7 @@
         assertEquals(expectedEndpoints, actualEndpoints);
 
         // Remove the EndpointListener Service
-        ld.removeListener(el);
+        ld.unbindListener(el);
         assertEquals(0, ld.listenerToFilters.size());
         assertEquals(0, ld.filterToListeners.size());
     }
@@ -215,7 +215,7 @@
 
         assertEquals("Precondition failed", 0, ld.listenerToFilters.size());
         assertEquals("Precondition failed", 0, ld.filterToListeners.size());
-        ld.addListener(sr, endpointListener);
+        ld.bindListener(sr, endpointListener);
 
         assertEquals(1, ld.listenerToFilters.size());
         assertEquals(Collections.singletonList("(objectClass=Aaaa)"), ld.listenerToFilters.get(endpointListener));
@@ -235,7 +235,7 @@
 
         EndpointEventListener endpointListener2 = EasyMock.createMock(EndpointEventListener.class);
         EasyMock.replay(endpointListener2);
-        ld.addListener(sr2, endpointListener2);
+        ld.bindListener(sr2, endpointListener2);
 
         assertEquals(2, ld.listenerToFilters.size());
         assertEquals(Collections.singletonList("(objectClass=Aaaa)"), ld.listenerToFilters.get(endpointListener));
@@ -260,7 +260,7 @@
 
         EndpointEventListener endpointListener3 = EasyMock.createMock(EndpointEventListener.class);
         EasyMock.replay(endpointListener3);
-        ld.addListener(sr3, endpointListener3);
+        ld.bindListener(sr3, endpointListener3);
 
         assertEquals(3, ld.listenerToFilters.size());
         assertEquals(Collections.singletonList("(objectClass=Aaaa)"), ld.listenerToFilters.get(endpointListener));
@@ -287,11 +287,11 @@
         assertEquals(1, ld.listenerToFilters.size());
         assertEquals(2, ld.filterToListeners.size());
         assertEquals(1, ld.filterToListeners.values().iterator().next().size());
-        ld.removeListener(EasyMock.createMock(EndpointEventListener.class));
+        ld.unbindListener(EasyMock.createMock(EndpointEventListener.class));
         assertEquals(1, ld.listenerToFilters.size());
         assertEquals(2, ld.filterToListeners.size());
         assertEquals(1, ld.filterToListeners.values().iterator().next().size());
-        ld.removeListener(endpointListener);
+        ld.unbindListener(endpointListener);
         assertEquals(0, ld.listenerToFilters.size());
         assertEquals(0, ld.filterToListeners.size());
     }
diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd
index 4a8cc29..4f06393 100644
--- a/discovery/zookeeper/bnd.bnd
+++ b/discovery/zookeeper/bnd.bnd
@@ -16,3 +16,4 @@
 #    under the License.
 Provide-Capability: osgi.remoteserviceadmin.discovery;\
 	protocols:List<String>="zookeeper"; version:Version=1.1.0
+Export-Package: org.apache.aries.rsa.discovery.zookeeper
\ No newline at end of file
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
index 9ba3fd2..e6bcf6f 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
@@ -18,14 +18,15 @@
  */
 package org.apache.aries.rsa.discovery.zookeeper;
 
+import static java.util.concurrent.CompletableFuture.runAsync;
+
 import java.io.IOException;
 import java.util.Hashtable;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.ZooTrace;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
@@ -40,16 +41,16 @@
 @Component(//
         service = ClientManager.class,
         immediate = true,
-        configurationPid = "org.apache.aries.rsa.discovery.zookeeper", //
+        configurationPid = ClientManager.DISCOVERY_ZOOKEEPER_ID, //
         configurationPolicy = ConfigurationPolicy.REQUIRE //
 )
 public class ClientManager implements Watcher {
-
+    public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.aries.rsa.discovery.zookeeper";
     private static final Logger LOG = LoggerFactory.getLogger(ClientManager.class);
 
     private ZooKeeper zkClient;
     private DiscoveryConfig config;
-    private ServiceRegistration<ZooKeeper> reg;
+    private volatile ServiceRegistration<ZooKeeper> reg;
     private BundleContext context;
 
     @Activate
@@ -81,11 +82,7 @@
         if (reg != null) {
             reg.unregister();
         }
-        CompletableFuture.runAsync(new Runnable() {
-            public void run() {
-                closeClient();
-            }
-        });
+        runAsync(this::closeClient);
     }
 
     private void closeClient() {
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterestManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
similarity index 63%
rename from discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterestManager.java
rename to discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
index 6e92c78..41bce86 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterestManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
@@ -16,16 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
+package org.apache.aries.rsa.discovery.zookeeper;
 
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
 import org.apache.aries.rsa.util.StringPlus;
 import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
@@ -39,10 +42,12 @@
  * Events from repository are then forwarded to all interested EndpointEventListeners.
  */
 @SuppressWarnings({"deprecation", "rawtypes"})
-public class InterestManager implements EndpointEventListener {
+@Component(service = InterestManager.class)
+public class InterestManager {
     private static final Logger LOG = LoggerFactory.getLogger(InterestManager.class);
 
-    private final ZookeeperEndpointRepository repository;
+    private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<>();
+    
     private final Map<ServiceReference, Interest> interests = new ConcurrentHashMap<>();
 
     protected static class Interest {
@@ -50,11 +55,51 @@
         Object epListener;
     }
 
-    public InterestManager(ZookeeperEndpointRepository repository) {
-        this.repository = repository;
+    @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
+    public void bindEndpointEventListener(ServiceReference<EndpointEventListener> sref, EndpointEventListener epListener) {
+        addInterest(sref, epListener);
+    }
+    
+    public void updatedEndpointEventListener(ServiceReference<EndpointEventListener> sref, EndpointEventListener epListener) {
+        addInterest(sref, epListener);
+    }
+    
+    public void unbindEndpointEventListener(ServiceReference<EndpointEventListener> sref) {
+        removeInterest(sref);
     }
 
-    public void addInterest(ServiceReference<?> sref, Object epListener) {
+    @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
+    public void bindEndpointListener(ServiceReference<EndpointListener> sref, EndpointListener epListener) {
+        addInterest(sref, epListener);
+    }
+    
+    public void updatedEndpointListener(ServiceReference<EndpointListener> sref, EndpointListener epListener) {
+        addInterest(sref, epListener);
+    }
+    
+    public void unbindEndpointListener(ServiceReference<EndpointListener> sref) {
+        removeInterest(sref);
+    }
+
+    private void removeInterest(ServiceReference<?> sref) {
+        if (interests.containsKey(sref)) {
+            List<String> scopes = getScopes(sref);
+            LOG.info("removing interests: {}", scopes);
+            interests.remove(sref);
+        }
+    }
+    
+    /**
+     * Read current endpoint stored at a znode
+     * 
+     * @param path
+     * @return
+     */
+    EndpointDescription read(String path) {
+        return nodes.get(path);
+    }
+
+    private void addInterest(ServiceReference<?> sref, Object epListener) {
         if (isOurOwnEndpointEventListener(sref)) {
             LOG.debug("Skipping our own EndpointEventListener");
             return;
@@ -71,28 +116,40 @@
             interest.scopes = scopes;
             interests.put(sref, interest);
             sendExistingEndpoints(scopes, epListener);
+        } else {
+            interest.scopes = scopes;
+            sendExistingEndpoints(scopes, epListener);
         }
     }
 
     private void sendExistingEndpoints(List<String> scopes, Object epListener) {
-        for (EndpointDescription endpoint : repository.getAll()) {
+        for (EndpointDescription endpoint : nodes.values()) {
             EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
             notifyListener(event, scopes, epListener);
         }
     }
 
-    private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
+    private static boolean isOurOwnEndpointEventListener(ServiceReference<?> endpointEventListener) {
         return Boolean.parseBoolean(String.valueOf(
-                EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
+                endpointEventListener.getProperty(ClientManager.DISCOVERY_ZOOKEEPER_ID)));
+    }
+    
+    public void handleRemoved(String path) {
+        EndpointDescription endpoint = nodes.remove(path);
+        if (endpoint != null) {
+            EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
+            endpointChanged(event);
+        }
     }
 
-    public void removeInterest(ServiceReference<EndpointEventListener> epListenerRef) {
-        LOG.info("removing EndpointEventListener interests: {}", epListenerRef);
-        interests.remove(epListenerRef);
+    public void handleChanged(String path, EndpointDescription endpoint) {
+        EndpointDescription old = nodes.put(path, endpoint);
+        int type = old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED;
+        EndpointEvent event = new EndpointEvent(type, endpoint);
+        endpointChanged(event);
     }
 
-    @Override
-    public void endpointChanged(EndpointEvent event, String filter) {
+    private void endpointChanged(EndpointEvent event) {
         for (Interest interest : interests.values()) {
             notifyListener(event, interest.scopes, interest.epListener);
         }
@@ -146,7 +203,9 @@
         }
     }
 
+    @Deactivate
     public synchronized void close() {
+        nodes.clear();
         interests.clear();
     }
 
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java
new file mode 100644
index 0000000..89e1556
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+/**
+ * Listens for local EndpointEvents using old and new style listeners and publishes changes to 
+ * the ZooKeeperEndpointRepository
+ */
+@SuppressWarnings("deprecation")
+@Component(service = {}, immediate = true)
+public class PublishingEndpointListener implements EndpointEventListener, EndpointListener {
+
+    private ServiceRegistration<?> listenerReg;
+    
+    @Reference
+    private ZookeeperEndpointPublisher repository;
+
+    @Activate
+    public void start(BundleContext bctx) {
+        String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
+        String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+        Dictionary<String, String> props = serviceProperties(uuid);
+        listenerReg = bctx.registerService(ifAr, this, props);
+    }
+
+    @Deactivate
+    public void stop() {
+        listenerReg.unregister();
+    }
+
+    @Override
+    public void endpointChanged(EndpointEvent event, String filter) {
+        repository.endpointChanged(event);
+    }
+    
+    @Override
+    public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+        endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), matchedFilter);
+    }
+
+    @Override
+    public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+        endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint), matchedFilter);
+    }
+
+    private Dictionary<String, String> serviceProperties(String uuid) {
+        String scope = String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, 
+                        RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid);
+        Dictionary<String, String> props = new Hashtable<>();
+        props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, scope);
+        props.put(ClientManager.DISCOVERY_ZOOKEEPER_ID, "true");
+        return props;
+    }
+
+
+}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
deleted file mode 100644
index 1ca8516..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper;
-
-import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
-import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterestManager;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component
-public class ZooKeeperDiscovery {
-    public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
-
-    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class);
-
-    @Reference
-    private ZooKeeper zkClient;
-    
-    private PublishingEndpointListener endpointListener;
-    private ServiceTracker<?, ?> endpointListenerTracker;
-    private InterestManager imManager;
-    private ZookeeperEndpointRepository repository;
-
-    @Activate
-    public void activate(BundleContext context) {
-        LOG.debug("Starting ZookeeperDiscovery");
-        repository = new ZookeeperEndpointRepository(zkClient);
-        endpointListener = new PublishingEndpointListener(repository);
-        endpointListener.start(context);
-        imManager = new InterestManager(repository);
-        repository.addListener(imManager);
-        endpointListenerTracker = new EndpointListenerTracker(context, imManager);
-        endpointListenerTracker.open();
-    }
-
-    @Deactivate
-    public void deactivate() {
-        LOG.debug("Stopping ZookeeperDiscovery");
-        endpointListener.stop();
-        endpointListenerTracker.close();
-        imManager.close();
-        repository.close();
-    }
-
-}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java
new file mode 100644
index 0000000..37fdada
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.util.List;
+
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(immediate = true)
+public class ZookeeperEndpointListener {
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointListener.class);
+    
+    @Reference
+    private ZooKeeper zk;
+    
+    @Reference
+    private EndpointDescriptionParser parser;
+    
+    @Reference
+    private InterestManager listener;
+    
+    @Reference
+    private ZookeeperEndpointPublisher publisher;
+    
+    public ZookeeperEndpointListener() {
+    }
+    
+    public ZookeeperEndpointListener(ZooKeeper zk, EndpointDescriptionParser parser, InterestManager listener) {
+        this.zk = zk;
+        this.parser = parser;
+        this.listener = listener;
+        activate();
+    }
+    
+    @Activate
+    public void activate() {
+        watchRecursive(ZookeeperEndpointPublisher.PATH_PREFIX);
+    }
+    
+    private void process(WatchedEvent event) {
+        String path = event.getPath();
+        LOG.info("Received event {}", event);
+        switch (event.getType()) {
+        case NodeCreated:
+        case NodeDataChanged:
+        case NodeChildrenChanged:
+            watchRecursive(path);
+            break;
+        case NodeDeleted:
+            listener.handleRemoved(path);
+            break;
+        default:
+            break;
+        }
+    }
+
+    private void watchRecursive(String path) {
+        LOG.info("Watching {}", path);
+        try {
+            EndpointDescription endpoint = read(path);
+            if (endpoint != null) {
+                listener.handleChanged(path, endpoint);
+            }
+            List<String> children = zk.getChildren(path, this::process);
+            if (children == null) {
+                return;
+            }
+            for (String child : children) {
+                String childPath = (path.endsWith("/") ? path : path + "/") + child;
+                watchRecursive(childPath);
+            }
+        } catch (NoNodeException | SessionExpiredException | ConnectionLossException e) {
+            // NoNodeException happens when a node was removed
+            LOG.debug(e.getMessage(), e);
+        } catch (Exception e) {
+            LOG.info(e.getMessage(), e);
+        }
+    }
+
+    EndpointDescription read(String path) throws KeeperException, InterruptedException {
+        Stat stat = new Stat();
+        byte[] data = zk.getData(path, this::process, stat);
+        if (data == null || data.length == 0) {
+            return null;
+        } else {
+            return parser.readEndpoint(new ByteArrayInputStream(data));
+        }
+    }
+
+}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java
new file mode 100644
index 0000000..7757396
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(service = ZookeeperEndpointPublisher.class)
+public class ZookeeperEndpointPublisher {
+    public static final String PATH_PREFIX = "/osgi/service_registry";
+    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointPublisher.class);
+    private final Map<Integer, String> typeNames = new HashMap<>();
+    
+    @Reference
+    private ZooKeeper zk;
+    
+    @Reference
+    private EndpointDescriptionParser parser;
+    
+    public ZookeeperEndpointPublisher() {
+        typeNames.put(EndpointEvent.ADDED, "added");
+        typeNames.put(EndpointEvent.MODIFIED, "modified");
+        typeNames.put(EndpointEvent.MODIFIED_ENDMATCH, "modified");
+        typeNames.put(EndpointEvent.REMOVED, "removed");
+    }
+    
+    public ZookeeperEndpointPublisher(ZooKeeper zk, EndpointDescriptionParser parser) {
+        this();
+        this.zk = zk;
+        this.parser = parser;
+    }
+
+    @Activate 
+    public void activate() {
+        try {
+            createPath(PATH_PREFIX);
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to create base path");
+        }
+    }
+
+    public void endpointChanged(EndpointEvent event) {
+        try {
+            EndpointDescription endpoint = event.getEndpoint();
+            switch (event.getType()) {
+            case EndpointEvent.ADDED:
+                add(endpoint);
+                break;
+            case EndpointEvent.MODIFIED:
+            case EndpointEvent.MODIFIED_ENDMATCH:
+                modify(endpoint);
+                break;
+            case EndpointEvent.REMOVED:
+                remove(endpoint);
+                break;
+            default:
+                break;
+            }
+        } catch (Exception e) {
+            logException(typeNames.get(event.getType()), event.getEndpoint(), e);
+        }
+    }
+    
+    private void logException(String operation, EndpointDescription endpoint, Exception ex) {
+        String msg = String.format("Exception during %s of endpoint %s", operation, endpoint.getId());
+        LOG.error(msg, ex);
+    }
+
+
+    private void add(EndpointDescription endpoint) throws KeeperException, InterruptedException  {
+        String path = getZooKeeperPath(endpoint.getId());
+        LOG.info("Exporting path: {}, Endpoint: {}", path, endpoint);
+        createBasePath();
+        byte[] data = getData(endpoint);
+        createEphemeralNode(path, data);
+    }
+
+    private void modify(EndpointDescription endpoint) throws KeeperException, InterruptedException {
+        String path = getZooKeeperPath(endpoint.getId());
+        LOG.info("Changing endpoint in zookeeper. Endpoint: {}, Path: {}", endpoint, path);
+        createBasePath();
+        zk.setData(path, getData(endpoint), -1);
+    }
+    
+    private void remove(EndpointDescription endpoint) throws KeeperException, InterruptedException {
+        String path = getZooKeeperPath(endpoint.getId());
+        LOG.info("Removing endpoint in zookeeper. Endpoint: {}, Path: {}", endpoint, path);
+        zk.delete(path, -1);
+    }
+    
+    private boolean notEmpty(String part) {
+        return part != null && !part.isEmpty();
+    }
+
+    static String getZooKeeperPath(String name) {
+        String escaped = name.replace('/', '#');
+        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + escaped;
+    }
+
+    private void createBasePath() throws KeeperException, InterruptedException {
+        String path = ZookeeperEndpointPublisher.getZooKeeperPath("");
+        createPath(path);
+    }
+
+    private byte[] getData(EndpointDescription epd) {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        parser.writeEndpoint(epd, bos);
+        return bos.toByteArray();
+    }
+    
+    private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
+        try {
+            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        } catch (NodeExistsException nee) {
+            // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+            // that belonged to the old session was not yet deleted. We need to make our
+            // session the owner of the node so it won't get deleted automatically -
+            // we do this by deleting and recreating it ourselves.
+            LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+            try {
+                zk.delete(fullPath, -1);
+            } catch (NoNodeException nne) {
+                // it's a race condition, but as long as it got deleted - it's ok
+            }
+            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+    }
+    
+    private void createPath(String path) throws KeeperException, InterruptedException {
+        List<String> parts = asList(path.split("/")).stream()
+                .filter(this::notEmpty)
+                .collect(toList());
+        StringBuilder current = new StringBuilder();
+        for (String part : parts) {
+            current.append('/');
+            current.append(part);
+            createNode(current.toString());
+        }
+    }
+
+    private void createNode(String path) throws KeeperException, InterruptedException {
+        try {
+            if (zk.exists(path, false) == null) {
+                zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+        } catch (NodeExistsException nee) {
+            // it's not the first node with this path to ever exist - that's normal
+        }
+    }
+
+}
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/package-info.java
similarity index 76%
rename from discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
rename to discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/package-info.java
index 478e41b..1e835fe 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/package-info.java
@@ -16,12 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.local;
-
-import junit.framework.TestCase;
-
-public class ActivatorTest extends TestCase {
-
-    public void testActivator() throws Exception {
-    }
-}
+@org.osgi.annotation.bundle.Capability( //
+        namespace = "osgi.remoteserviceadmin.discovery", //
+        attribute = {"protocols:List<String>=zookeeper"}, //
+        version = "1.1.0"
+)
+package org.apache.aries.rsa.discovery.zookeeper;
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
deleted file mode 100644
index 7f352a7..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Listens for local EndpointEvents using old and new style listeners and publishes changes to 
- * the ZooKeeperEndpointRepository
- */
-@SuppressWarnings("deprecation")
-public class PublishingEndpointListener implements EndpointEventListener, EndpointListener {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
-
-    private ServiceRegistration<?> listenerReg;
-    private ZookeeperEndpointRepository repository;
-
-    public PublishingEndpointListener(ZookeeperEndpointRepository repository) {
-        this.repository = repository;
-    }
-    
-    public void start(BundleContext bctx) {
-        Dictionary<String, String> props = new Hashtable<>();
-        String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
-        props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, 
-                  String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS, 
-                                RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
-        props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
-        String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
-        listenerReg = bctx.registerService(ifAr, this, props);
-    }
-    
-    public void stop() {
-        if (listenerReg != null) {
-            listenerReg.unregister();
-            listenerReg = null;
-        }
-    }
-
-    @Override
-    public void endpointChanged(EndpointEvent event, String filter) {
-        EndpointDescription endpoint = event.getEndpoint();
-        switch (event.getType()) {
-        case EndpointEvent.ADDED:
-            endpointAdded(endpoint, filter);
-            break;
-        case EndpointEvent.REMOVED:
-            endpointRemoved(endpoint, filter);
-            break;
-        case EndpointEvent.MODIFIED:
-            endpointModified(endpoint, filter);
-            break;
-        }
-    }
-    
-    private void endpointModified(EndpointDescription endpoint, String filter) {
-        try {
-            repository.modify(endpoint);
-        } catch (Exception ex) {
-            logException("modification", endpoint, ex);
-        }
-    }
-
-    @Override
-    public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
-        try {
-            repository.add(endpoint);
-        } catch (Exception ex) {
-            logException("adding", endpoint, ex);
-        }
-    }
-
-    @Override
-    public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
-        try {
-            repository.remove(endpoint);
-        } catch (Exception ex) {
-            logException("removal", endpoint, ex);
-        }
-    }
-    
-    private void logException(String operation, EndpointDescription endpoint, Exception ex) {
-        String msg = String.format("Exception during %s of endpoint %s", operation, endpoint.getId());
-        LOG.error(msg, ex);
-    }
-
-}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
deleted file mode 100644
index 5b0d66b..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.repository;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperEndpointRepository implements Closeable, Watcher {
-    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointRepository.class);
-    private final ZooKeeper zk;
-    private final EndpointDescriptionParser parser;
-    private EndpointEventListener listener;
-    public static final String PATH_PREFIX = "/osgi/service_registry";
-    
-    private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<>();
-    
-    public ZookeeperEndpointRepository(ZooKeeper zk) {
-        this.zk = zk;
-        this.parser = new EndpointDescriptionParser();
-        try {
-            createPath(PATH_PREFIX);
-        } catch (Exception e) {
-            throw new IllegalStateException("Unable to create base path");
-        }
-        this.registerWatcher();
-    }
-
-    public void addListener(EndpointEventListener listener) {
-        this.listener = listener;
-    }
-
-    /**
-     * Read current endpoint stored at a znode
-     * 
-     * @param path
-     * @return
-     */
-    public EndpointDescription read(String path) {
-        return nodes.get(path);
-    }
-
-    public void add(EndpointDescription endpoint) throws KeeperException, InterruptedException  {
-        String path = getZooKeeperPath(endpoint.getId());
-        LOG.info("Exporting endpoint to zookeeper. Endpoint: {}, Path: {}", endpoint, path);
-        createBasePath();
-        createEphemeralNode(path, getData(endpoint));
-    }
-
-    public void modify(EndpointDescription endpoint) throws KeeperException, InterruptedException {
-        String path = getZooKeeperPath(endpoint.getId());
-        LOG.info("Changing endpoint in zookeeper. Endpoint: {}, Path: {}", endpoint, path);
-        createBasePath();
-        zk.setData(path, getData(endpoint), -1);
-    }
-    
-    public void remove(EndpointDescription endpoint) throws InterruptedException, KeeperException {
-        String path = getZooKeeperPath(endpoint.getId());
-        LOG.info("Removing endpoint in zookeeper. Endpoint: {}, Path: {}", endpoint, path);
-        zk.delete(path, -1);
-    }
-    
-    public Collection<EndpointDescription> getAll() {
-        return nodes.values();
-    }
-
-    /**
-     * Removes nulls and empty strings from the given string array.
-     *
-     * @param strings an array of strings
-     * @return a new array containing the non-null and non-empty
-     *         elements of the original array in the same order
-     */
-    public static List<String> removeEmpty(List<String> strings) {
-        List<String> result = new ArrayList<>();
-        if (strings == null) {
-            return result;
-        }
-        for (String s : strings) {
-            if (s != null && !s.isEmpty()) {
-                result.add(s);
-            }
-        }
-        return result;
-    }
-
-    public static String getZooKeeperPath(String name) {
-        String escaped = name.replace('/', '#');
-        return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + escaped;
-    }
-
-    @Override
-    public void process(WatchedEvent event) {
-        LOG.info("Received event {}", event);
-        switch (event.getType()) {
-        case NodeCreated:
-        case NodeDataChanged:
-        case NodeChildrenChanged:
-            watchRecursive(event.getPath());
-            break;
-        case NodeDeleted:
-            handleRemoved(event.getPath());
-            break;
-        default:
-            break;
-        }
-    }
-
-    @Override
-    public void close() {
-        nodes.clear();
-    }
-
-    private void createBasePath() throws KeeperException, InterruptedException {
-        String path = ZookeeperEndpointRepository.getZooKeeperPath("");
-        createPath(path);
-    }
-
-    private void registerWatcher() {
-        try {
-            watchRecursive(ZookeeperEndpointRepository.PATH_PREFIX);
-        } catch (Exception e) {
-            LOG.info(e.getMessage(), e);
-        }
-    }
-
-    /**
-     * TODO Check if we handle connection losses correctly
-     * @param path
-     */
-    private void watchRecursive(String path) {
-        LOG.info("Watching {}", path);
-        try {
-            handleZNodeChanged(path);
-            List<String> children = zk.getChildren(path, this);
-            if (children == null) {
-                return;
-            }
-            for (String child : children) {
-                String childPath = (path.endsWith("/") ? path : path + "/") + child;
-                watchRecursive(childPath);
-            }
-        } catch (NoNodeException | SessionExpiredException | ConnectionLossException e) {
-            // NoNodeException happens when a node was removed
-            LOG.debug(e.getMessage(), e);
-        } catch (Exception e) {
-            LOG.info(e.getMessage(), e);
-        }
-    }
-
-    private byte[] getData(EndpointDescription epd) {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        parser.writeEndpoint(epd, bos);
-        return bos.toByteArray();
-    }
-    
-    private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
-        try {
-            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-        } catch (NodeExistsException nee) {
-            // this sometimes happens after a ZooKeeper node dies and the ephemeral node
-            // that belonged to the old session was not yet deleted. We need to make our
-            // session the owner of the node so it won't get deleted automatically -
-            // we do this by deleting and recreating it ourselves.
-            LOG.info("node for endpoint already exists, recreating: {}", fullPath);
-            try {
-                zk.delete(fullPath, -1);
-            } catch (NoNodeException nne) {
-                // it's a race condition, but as long as it got deleted - it's ok
-            }
-            zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-        }
-    }
-    
-    private void createPath(String path) throws KeeperException, InterruptedException {
-        StringBuilder current = new StringBuilder();
-        List<String> parts = ZookeeperEndpointRepository.removeEmpty(Arrays.asList(path.split("/")));
-        for (String part : parts) {
-            current.append('/');
-            current.append(part);
-            try {
-                if (zk.exists(current.toString(), false) == null) {
-                    zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-                }
-            } catch (NodeExistsException nee) {
-                // it's not the first node with this path to ever exist - that's normal
-            }
-        }
-    }
-
-    private void handleZNodeChanged(String path) throws KeeperException, InterruptedException {
-        Stat stat = new Stat();
-        byte[] data = zk.getData(path, this, stat);
-        if (data == null || data.length == 0) {
-            return;
-        }
-        EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
-        if (endpoint != null) {
-            handleChanged(path, endpoint);
-        }
-    }
-
-    private void handleRemoved(String path) {
-        EndpointDescription endpoint = nodes.remove(path);
-        EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
-        if (listener != null) {
-            listener.endpointChanged(event, null);
-        }
-    }
-
-    private void handleChanged(String path, EndpointDescription endpoint) {
-        EndpointDescription old = nodes.put(path, endpoint);
-        int type = old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED;
-        EndpointEvent event = new EndpointEvent(type, endpoint);
-        if (listener != null) {
-            listener.endpointChanged(event, null);
-        }
-    }
-
-}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
deleted file mode 100644
index 26910cd..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.util.tracker.ServiceTracker;
-
-/**
- * Tracks EndpointListeners and EndpointEventListeners. Delegates to InterestManager to handle them
- */
-@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
-public class EndpointListenerTracker extends ServiceTracker {
-    private final InterestManager imManager;
-
-    public EndpointListenerTracker(BundleContext bctx, InterestManager imManager) {
-        super(bctx, getFilter(), null);
-        this.imManager = imManager;
-    }
-    
-    private static Filter getFilter() {
-        String filterSt = String.format("(|(objectClass=%s)(objectClass=%s))", EndpointEventListener.class.getName(), 
-                EndpointListener.class.getName());
-        try {
-            return FrameworkUtil.createFilter(filterSt);
-        } catch (InvalidSyntaxException e) {
-            throw new IllegalArgumentException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public Object addingService(ServiceReference sref) {
-        Object epListener = super.addingService(sref);
-        imManager.addInterest(sref, epListener);
-        return epListener;
-    }
-
-    @Override
-    public void modifiedService(ServiceReference sref, Object epListener) {
-        // called when an EndpointListener updates its service properties,
-        // e.g. when its interest scope is expanded/reduced
-        imManager.addInterest(sref, epListener);
-    }
-
-    @Override
-    public void removedService(ServiceReference sref, Object epListener) {
-        imManager.removeInterest(sref);
-        super.removedService(sref, epListener);
-    }
-
-}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java
new file mode 100644
index 0000000..985f1b0
--- /dev/null
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+
+public class InterestManagerTest {
+
+    @Test
+    public void testEndpointListenerTrackerCustomizer() {
+        IMocksControl c = EasyMock.createControl();
+        ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)");
+        ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)");
+        EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class); 
+        EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class); 
+
+        c.replay();
+
+        InterestManager im = new InterestManager();
+        // sref has no scope -> nothing should happen
+        assertEquals(0, im.getInterests().size());
+
+        im.bindEndpointEventListener(sref, epListener1);
+        assertEquals(1, im.getInterests().size());
+
+        im.bindEndpointEventListener(sref, epListener1);
+        assertEquals(1, im.getInterests().size());
+
+        im.bindEndpointEventListener(sref2, epListener2);
+        assertEquals(2, im.getInterests().size());
+
+        im.unbindEndpointEventListener(sref);
+        assertEquals(1, im.getInterests().size());
+
+        im.unbindEndpointEventListener(sref);
+        assertEquals(1, im.getInterests().size());
+
+        im.unbindEndpointEventListener(sref2);
+        assertEquals(0, im.getInterests().size());
+
+        c.verify();
+    }
+
+    @SuppressWarnings("unchecked")
+    private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) {
+        ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class);
+        expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce();
+        expect(sref.getProperty(ClientManager.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce();
+        return sref;
+    }
+
+}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java
new file mode 100644
index 0000000..815c42a
--- /dev/null
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.Constants;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PublishingEndpointListenerTest {
+    @Mock
+    ZookeeperEndpointPublisher repository;
+
+    @InjectMocks
+    PublishingEndpointListener eli;
+
+    @Test
+    public void testEndpointRemovalAdding() throws KeeperException, InterruptedException {
+        EndpointDescription endpoint = createEndpoint();
+        EndpointEvent event1 = new EndpointEvent(EndpointEvent.ADDED, endpoint);
+        eli.endpointChanged(event1, null);
+        EndpointEvent event2 = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
+        eli.endpointChanged(event2, null);
+        
+        verify(repository).endpointChanged(event1);
+        verify(repository).endpointChanged(event2);
+    }
+
+    private EndpointDescription createEndpoint() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(Constants.OBJECTCLASS, new String[] {"myClass"});
+        props.put(RemoteConstants.ENDPOINT_ID, "http://google.de:80/test/sub");
+        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "myConfig");
+        return new EndpointDescription(props);
+    }
+}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
similarity index 66%
rename from discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
rename to discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
index a65696b..02f6093 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.zookeeper.repository;
+package org.apache.aries.rsa.discovery.zookeeper;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
@@ -35,9 +35,10 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParserImpl;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
@@ -45,46 +46,35 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
 import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceReference;
 import org.osgi.service.remoteserviceadmin.EndpointDescription;
 import org.osgi.service.remoteserviceadmin.EndpointEvent;
 import org.osgi.service.remoteserviceadmin.EndpointEventListener;
 import org.osgi.service.remoteserviceadmin.RemoteConstants;
 
-public class ZookeeperEndpointRepositoryTest {
-    
+@RunWith(MockitoJUnitRunner.class)
+public class ZookeeperDiscoveryTest {
+    final Semaphore semConnected = new Semaphore(0);
+    final Semaphore sem = new Semaphore(0);
     private ZooKeeperServer server;
     private ZooKeeper zk;
     private ServerCnxnFactory factory;
     private List<EndpointEvent> events = new ArrayList<>();
+    @Mock
+    private ServiceReference<EndpointEventListener> sref;
 
     @Before
     public void before() throws IOException, InterruptedException, KeeperException {
-        File target = new File("target");
-        File zookeeperDir = new File(target, "zookeeper");
-        server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000);
-        factory = new NIOServerCnxnFactory();
-        int clientPort = getClientPort();
-        factory.configure(new InetSocketAddress(clientPort), 10);
-        factory.startup(server);
-        Watcher watcher = new Watcher() {
-
-            @Override
-            public void process(WatchedEvent event) {
-                System.out.println(event);
-            }
-            
-        };
-        zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, watcher);
+        startZookeeperServer();
+        zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, this::process);
         printNodes("/");
     }
     
-    private int getClientPort() throws IOException {
-        try (ServerSocket serverSocket = new ServerSocket(0)) {
-            return serverSocket.getLocalPort();
-        }
-    }
-
     @After
     public void after() throws InterruptedException {
         //zk.close(); // Seems to cause SessionTimeout error 
@@ -93,44 +83,62 @@
 
     @Test
     public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException {
-        final Semaphore sem = new Semaphore(0);
-        EndpointEventListener listener = new EndpointEventListener() {
-            
-            @Override
-            public void endpointChanged(EndpointEvent event, String filter) {
-                events.add(event);
-                sem.release();
-            }
-        };
-        ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk);
-        repository.addListener(listener);
+        EndpointDescriptionParserImpl parser = new EndpointDescriptionParserImpl();
+        ZookeeperEndpointPublisher repository = new ZookeeperEndpointPublisher(zk, parser);
+        repository.activate();
+        InterestManager im = new InterestManager();
+        
+        String scope = "("+ Constants.OBJECTCLASS +"=*)";
+        Mockito.when(sref.getProperty(Mockito.eq(EndpointEventListener.ENDPOINT_LISTENER_SCOPE))).thenReturn(scope);
+        im.bindEndpointEventListener(sref, this::onEndpointChanged);
+        ZookeeperEndpointListener zklistener = new ZookeeperEndpointListener(zk, parser, im);
+        
+        assertThat(semConnected.tryAcquire(1, SECONDS), equalTo(true));
         
         EndpointDescription endpoint = createEndpoint();
-        repository.add(endpoint);
+        repository.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint));
         
-        assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
-
+        assertThat(sem.tryAcquire(100, SECONDS), equalTo(true));
+    
         String path = "/osgi/service_registry/http:##test.de#service1";
-        EndpointDescription ep2 = repository.read(path);
+        EndpointDescription ep2 = zklistener.read(path);
         assertNotNull(ep2);
 
-        repository.remove(endpoint);
-
+        repository.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint));
+    
         assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
         assertThat(events.get(0).getType(), equalTo(EndpointEvent.ADDED));
         assertThat(events.get(1).getType(), equalTo(EndpointEvent.REMOVED));
         assertThat(events.get(0).getEndpoint(), equalTo(endpoint));
         assertThat(events.get(1).getEndpoint(), equalTo(endpoint));
-        
-        repository.close();
+    }
+
+    private void process(WatchedEvent event) {
+        if (event.getState() == KeeperState.SyncConnected) {
+            semConnected.release();
+        }
+        System.out.println(event);
+    }
+
+    private void startZookeeperServer() throws IOException, InterruptedException {
+        File target = new File("target");
+        File zookeeperDir = new File(target, "zookeeper");
+        server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000);
+        factory = new NIOServerCnxnFactory();
+        int clientPort = getClientPort();
+        factory.configure(new InetSocketAddress(clientPort), 10);
+        factory.startup(server);
     }
     
-    @Test
-    public void testGetZooKeeperPath() {
-        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + "http:##org.example.Test",
-            ZookeeperEndpointRepository.getZooKeeperPath("http://org.example.Test"));
+    private int getClientPort() throws IOException {
+        try (ServerSocket serverSocket = new ServerSocket(0)) {
+            return serverSocket.getLocalPort();
+        }
+    }
 
-        assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(""));
+    private void onEndpointChanged(EndpointEvent event, String filter) {
+        events.add(event);
+        sem.release();
     }
 
     private EndpointDescription createEndpoint() {
@@ -143,7 +151,7 @@
         return endpoint;
     }
 
-    public void printNodes(String path) throws KeeperException, InterruptedException {
+    private void printNodes(String path) throws KeeperException, InterruptedException {
         List<String> children = zk.getChildren(path, false);
         for (String child : children) {
             String newPath = path.endsWith("/") ? path : path + "/";
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
similarity index 60%
copy from discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
copy to discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
index 478e41b..31441d3 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
@@ -16,12 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.local;
+package org.apache.aries.rsa.discovery.zookeeper;
 
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
 
-public class ActivatorTest extends TestCase {
+import org.junit.Test;
 
-    public void testActivator() throws Exception {
+public class ZookeeperEndpointPublisherPathTest {
+    
+    @Test
+    public void testGetZooKeeperPath() {
+        assertEquals(ZookeeperEndpointPublisher.PATH_PREFIX + '/' + "http:##org.example.Test",
+            ZookeeperEndpointPublisher.getZooKeeperPath("http://org.example.Test"));
+
+        assertEquals(ZookeeperEndpointPublisher.PATH_PREFIX, ZookeeperEndpointPublisher.getZooKeeperPath(""));
     }
+    
 }
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
deleted file mode 100644
index 3884c95..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import static org.easymock.EasyMock.expect;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.osgi.framework.Constants;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-
-import junit.framework.TestCase;
-
-public class PublishingEndpointListenerTest extends TestCase {
-
-    private static final String ENDPOINT_PATH = "/osgi/service_registry/http:##google.de:80#test#sub";
-
-    public void testEndpointRemovalAdding() throws KeeperException, InterruptedException {
-        IMocksControl c = EasyMock.createNiceControl();
-
-        ZooKeeper zk = c.createMock(ZooKeeper.class);
-
-        String path = ENDPOINT_PATH;
-        expectCreated(zk, path);
-        expectDeleted(zk, path);
-
-        c.replay();
-
-        ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk);
-        PublishingEndpointListener eli = new PublishingEndpointListener(repository);
-        EndpointDescription endpoint = createEndpoint();
-        eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
-        eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null); // should do nothing
-        eli.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint), null);
-        eli.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint), null); // should do nothing
-
-        c.verify();
-    }
-
-    private void expectCreated(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
-        expect(zk.create(EasyMock.eq(path), 
-                         (byte[])EasyMock.anyObject(), 
-                         EasyMock.eq(Ids.OPEN_ACL_UNSAFE),
-                         EasyMock.eq(CreateMode.EPHEMERAL)))
-            .andReturn("");
-    }
-
-    private void expectDeleted(ZooKeeper zk, String path) throws InterruptedException, KeeperException {
-        zk.delete(EasyMock.eq(path), EasyMock.eq(-1));
-        EasyMock.expectLastCall().once();
-    }
-
-    private EndpointDescription createEndpoint() {
-        Map<String, Object> props = new HashMap<>();
-        props.put(Constants.OBJECTCLASS, new String[] {"myClass"});
-        props.put(RemoteConstants.ENDPOINT_ID, "http://google.de:80/test/sub");
-        props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "myConfig");
-        return new EndpointDescription(props);
-    }
-}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
deleted file mode 100644
index 79b2d8d..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.junit.Test;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-
-public class InterfaceMonitorManagerTest {
-
-    @Test
-    public void testEndpointListenerTrackerCustomizer() {
-        IMocksControl c = EasyMock.createControl();
-        ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)");
-        ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)");
-        ZookeeperEndpointRepository repository = c.createMock(ZookeeperEndpointRepository.class);
-        List<EndpointDescription> endpoints = new ArrayList<>();
-        expect(repository.getAll()).andReturn(endpoints).atLeastOnce();
-        EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class); 
-        EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class); 
-
-        c.replay();
-
-        InterestManager eltc = new InterestManager(repository);
-        // sref has no scope -> nothing should happen
-        assertEquals(0, eltc.getInterests().size());
-
-
-        eltc.addInterest(sref, epListener1);
-        assertEquals(1, eltc.getInterests().size());
-
-        eltc.addInterest(sref, epListener1);
-        assertEquals(1, eltc.getInterests().size());
-
-        eltc.addInterest(sref2, epListener2);
-        assertEquals(2, eltc.getInterests().size());
-
-        eltc.removeInterest(sref);
-        assertEquals(1, eltc.getInterests().size());
-
-        eltc.removeInterest(sref);
-        assertEquals(1, eltc.getInterests().size());
-
-        eltc.removeInterest(sref2);
-        assertEquals(0, eltc.getInterests().size());
-
-        c.verify();
-    }
-
-    @SuppressWarnings("unchecked")
-    private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) {
-        ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class);
-        expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce();
-        expect(sref.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce();
-        return sref;
-    }
-
-}
diff --git a/discovery/zookeeper/src/test/resources/logback.xml b/discovery/zookeeper/src/test/resources/logback.xml
new file mode 100644
index 0000000..7f2cb48
--- /dev/null
+++ b/discovery/zookeeper/src/test/resources/logback.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration>
+  <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%date %level %logger:%line %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <logger name="org.apache.zookeeper" level="WARN" /> 
+
+  <root level="info">
+    <appender-ref ref="console"/>
+  </root>
+</configuration>
\ No newline at end of file
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
index 0b6aadc..a24b140 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
@@ -18,20 +18,20 @@
  */
 package org.apache.aries.rsa.itests.felix.tcp;
 
+import static org.awaitility.Awaitility.await;
+
 import java.io.ByteArrayInputStream;
-import java.util.List;
 
 import javax.inject.Inject;
 
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.zookeeper.ZookeeperEndpointPublisher;
 import org.apache.aries.rsa.examples.echotcp.api.EchoService;
 import org.apache.aries.rsa.itests.felix.RsaTestBase;
 import org.apache.aries.rsa.spi.DistributionProvider;
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,10 +42,14 @@
 
 @RunWith(PaxExam.class)
 public class TestDiscoveryExport extends RsaTestBase {
-    private static final String GREETER_ZOOKEEPER_NODE = "/osgi/service_registry";
-
     @Inject
     DistributionProvider tcpProvider;
+    
+    @Inject
+    EndpointDescriptionParser parser;
+    
+    @Inject
+    ZooKeeper zookeeper;
 
     @Configuration
     public static Option[] configure() throws Exception {
@@ -70,44 +74,21 @@
     }
 
     private EndpointDescription getEndpoint() throws Exception {
-        ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 1000, new DummyWatcher());
-        assertNodeExists(zk, GREETER_ZOOKEEPER_NODE, 10000);
-        String endpointPath = getEndpointPath(zk, GREETER_ZOOKEEPER_NODE);
-        EndpointDescription epd = getEndpointDescription(zk, endpointPath);
-        zk.close();
-        return epd;
+        String endpointName = await("Node exists").until(this::getEndpointPath, Matchers.notNullValue());
+        return getEndpointDescription(zookeeper, ZookeeperEndpointPublisher.PATH_PREFIX + "/" + endpointName);
     }
 
     private EndpointDescription getEndpointDescription(ZooKeeper zk, String endpointPath)
         throws KeeperException, InterruptedException {
         byte[] data = zk.getData(endpointPath, false, null);
         ByteArrayInputStream is = new ByteArrayInputStream(data);
-        return new EndpointDescriptionParser().readEndpoint(is);
+        return parser.readEndpoint(is);
     }
 
-    private String getEndpointPath(ZooKeeper zk, String servicePath) throws KeeperException, InterruptedException {
-        List<String> children = zk.getChildren(servicePath, false);
-        return servicePath + "/" + children.iterator().next();
-    }
-
-    private void assertNodeExists(ZooKeeper zk, String zNode, int timeout) {
-        long endTime = System.currentTimeMillis() + timeout;
-        Stat stat = null;
-        while (stat == null && System.currentTimeMillis() < endTime) {
-            try {
-                stat = zk.exists(zNode, null);
-                Thread.sleep(200);
-            } catch (Exception e) {
-                // Ignore
-            }
-        }
-        Assert.assertNotNull("ZooKeeper node " + zNode + " was not found", stat);
-    }
-    
-    private final class DummyWatcher implements Watcher {
-        @Override
-        public void process(WatchedEvent event) {
-        }
+    private String getEndpointPath() throws KeeperException, InterruptedException {
+        return zookeeper.getChildren(ZookeeperEndpointPublisher.PATH_PREFIX, false).stream()
+                .findFirst()
+                .orElse(null);
     }
 
 }
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
index e735f55..4aea83f 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
@@ -31,9 +31,8 @@
 
 import javax.inject.Inject;
 
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
+import org.apache.aries.rsa.discovery.zookeeper.ZookeeperEndpointPublisher;
 import org.apache.aries.rsa.itests.felix.RsaTestBase;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Configuration;
@@ -49,10 +48,14 @@
 @RunWith(PaxExam.class)
 public class TestDiscoveryImport extends RsaTestBase {
     @Inject
-    ZookeeperEndpointRepository repository;
+    ZookeeperEndpointPublisher publisher;
     
     @Inject
     BundleContext context;
+
+    private Semaphore sem = new Semaphore(0);;
+
+    private List<EndpointEvent> events = new ArrayList<>();
     
     @Configuration
     public static Option[] configure() throws Exception {
@@ -66,30 +69,32 @@
         };
     }
 
-    @Ignore
     @Test
     public void testDiscoveryImport() throws Exception {
-        final Semaphore sem = new Semaphore(0);
-        final List<EndpointEvent> events = new ArrayList<>();
-        EndpointEventListener listener = new EndpointEventListener() {
-            
-            @Override
-            public void endpointChanged(EndpointEvent event, String filter) {
-                events.add(event);
-                sem.release();
-            }
-        };
+        context.registerService(EndpointEventListener.class, this::endpointChanged, listenerProps());
+        EndpointDescription endpoint = createEndpoint();
+        publisher.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint));
+        assertTrue(sem.tryAcquire(10, TimeUnit.SECONDS));
+        //assertThat(events.get(0), samePropertyValuesAs(new EndpointEvent(EndpointEvent.ADDED, endpoint)));
+    }
+
+    private Dictionary<String, Object> listenerProps() {
         Dictionary<String, Object> eprops = new Hashtable<>();
         eprops.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, "(objectClass=*)");
-        context.registerService(EndpointEventListener.class, listener, eprops);
+        return eprops;
+    }
+
+    private EndpointDescription createEndpoint() {
         Map<String, Object> props = new HashMap<>();
         props.put(Constants.OBJECTCLASS, new String[]{"my"});
         props.put(RemoteConstants.ENDPOINT_ID, "myid");
         props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "myconfig");
         EndpointDescription endpoint = new EndpointDescription(props);
-        repository.add(endpoint);
-        assertTrue(sem.tryAcquire(10, TimeUnit.SECONDS));
-        //assertThat(events.get(0), samePropertyValuesAs(new EndpointEvent(EndpointEvent.ADDED, endpoint)));
+        return endpoint;
     }
 
+    public void endpointChanged(EndpointEvent event, String filter) {
+        events.add(event);
+        sem.release();
+    }
 }
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
index 78598a9..09aa6fd 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
@@ -19,10 +19,10 @@
  */
 
 
+import static org.awaitility.Awaitility.await;
+
 import java.io.IOException;
 import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeoutException;
 
 import javax.inject.Inject;
 
@@ -30,12 +30,13 @@
 import org.apache.aries.rsa.itests.felix.RsaTestBase;
 import org.apache.aries.rsa.itests.felix.ServerConfiguration;
 import org.apache.aries.rsa.itests.felix.TwoContainerPaxExam;
-import org.junit.Assert;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.ops4j.pax.exam.Configuration;
 import org.ops4j.pax.exam.Option;
 import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
 import org.osgi.framework.ServiceReference;
 
 @RunWith(TwoContainerPaxExam.class)
@@ -66,48 +67,14 @@
                 configZKDiscovery()
         };
     }
-    
-    public <T> T tryTo(String message, Callable<T> func) throws TimeoutException {
-        return tryTo(message, func, 5000);
-    }
-    
-    public <T> T tryTo(String message, Callable<T> func, long timeout) throws TimeoutException {
-        Throwable lastException = null;
-        long startTime = System.currentTimeMillis();
-        while (System.currentTimeMillis() - startTime < timeout) {
-            try {
-                T result = func.call();
-                if (result != null) {
-                    return result;
-                }
-                lastException = null;
-            } catch (Throwable e) {
-                lastException = e;
-            }
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException e) {
-                continue;
-            }
-        }
-        TimeoutException ex = new TimeoutException("Timeout while trying to " + message);
-        if (lastException != null) {
-            ex.addSuppressed(lastException);
-        }
-        throw ex;
-    }
 
     @Test
     public void testFind() throws Exception {
-        ServiceReference<EchoService> ref = tryTo("get EchoService", new Callable<ServiceReference<EchoService>>() {
+        await().until(() -> getEchoServices().size(), Matchers.equalTo(1));
+    }
 
-            @Override
-            public ServiceReference<EchoService> call() throws Exception {
-                Collection<ServiceReference<EchoService>> refs = context.getServiceReferences(EchoService.class, null);
-                return (refs.size() > 0)? refs.iterator().next() : null;
-            }
-        }, 10000);
-        Assert.assertNotNull(ref);
+    private Collection<ServiceReference<EchoService>> getEchoServices() throws InvalidSyntaxException {
+        return context.getServiceReferences(EchoService.class, null);
     }
 
 }
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
index 2a038b8..7641219 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
@@ -58,7 +58,7 @@
                 rsaCore(),
                 rsaDiscoveryZookeeper(),
                 rsaProviderTcp(),
-                RsaTestBase.echoTcpConsumer(),
+                echoTcpConsumer(),
                 configZKDiscovery()
         };
     }
diff --git a/itests/felix/src/test/resources/logback.xml b/itests/felix/src/test/resources/logback.xml
index f4696e4..3b7921f 100644
--- a/itests/felix/src/test/resources/logback.xml
+++ b/itests/felix/src/test/resources/logback.xml
@@ -20,10 +20,12 @@
 <configuration>
   <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
     <encoder>
-      <pattern>%date %level %logger [%file : %line] %msg - %mdc %n</pattern>
+      <pattern>%date{HH:mm:ss} %level %logger{40}:%line - %msg%n</pattern>
     </encoder>
   </appender>
 
+  <logger name="org.apache.zookeeper" level="WARN" /> 
+
   <root level="info">
     <appender-ref ref="console"/>
   </root>
diff --git a/parent/pom.xml b/parent/pom.xml
index ab071f9..f65aac2 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -58,15 +58,20 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>osgi.annotation</artifactId>
+            <version>7.0.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <scope>provided</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-jdk14</artifactId>
-            <scope>test</scope>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.2.3</version>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
@@ -234,7 +239,7 @@
                 <plugin>
                     <groupId>org.apache.felix</groupId>
                     <artifactId>maven-bundle-plugin</artifactId>
-                    <version>3.5.1</version>
+                    <version>4.2.0</version>
                     <extensions>true</extensions>
                 </plugin>
                 <plugin>
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java b/spi/src/main/java/org/apache/aries/rsa/spi/EndpointDescriptionParser.java
similarity index 65%
copy from discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
copy to spi/src/main/java/org/apache/aries/rsa/spi/EndpointDescriptionParser.java
index 478e41b..caa5d01 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
+++ b/spi/src/main/java/org/apache/aries/rsa/spi/EndpointDescriptionParser.java
@@ -16,12 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.aries.rsa.discovery.local;
+package org.apache.aries.rsa.spi;
 
-import junit.framework.TestCase;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
 
-public class ActivatorTest extends TestCase {
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
 
-    public void testActivator() throws Exception {
-    }
+public interface EndpointDescriptionParser {
+    
+    List<EndpointDescription> readEndpoints(InputStream is);
+    
+    EndpointDescription readEndpoint(InputStream is);
+    
+    void writeEndpoint(EndpointDescription epd, OutputStream os);
 }
diff --git a/spi/src/main/java/org/apache/aries/rsa/spi/packageinfo b/spi/src/main/java/org/apache/aries/rsa/spi/packageinfo
index c72722a..1da3be9 100644
--- a/spi/src/main/java/org/apache/aries/rsa/spi/packageinfo
+++ b/spi/src/main/java/org/apache/aries/rsa/spi/packageinfo
@@ -16,4 +16,4 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-version 1.0.0
+version 1.1.0