ARIES-1780 - Redesign of zookeeper discovery using DS
diff --git a/.gitignore b/.gitignore
index ca2256e..9711ac3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,4 +11,5 @@
*.md.html
.idea/
generated/
+.vscode/
diff --git a/discovery/local/bnd.bnd b/discovery/local/bnd.bnd
index 203f6a1..d20f035 100644
--- a/discovery/local/bnd.bnd
+++ b/discovery/local/bnd.bnd
@@ -15,13 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-Bundle-Activator: org.apache.aries.rsa.discovery.local.Activator
Private-Package: org.apache.aries.rsa.discovery.local
Export-Package: \
- org.apache.aries.rsa.discovery.endpoint,\
org.osgi.xmlns.rsa.v1_0
Provide-Capability: osgi.extender;osgi.extender="osgi.remoteserviceadmin";\
version:Version="1.1.0";\
uses:="org.osgi.service.remoteserviceadmin",\
osgi.remoteserviceadmin.discovery;\
protocols:List<String>="local"; version:Version=1.1.0
+-privatepackage: org.apache.aries.rsa.discovery.endpoint
\ No newline at end of file
diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParser.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserImpl.java
similarity index 94%
rename from discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParser.java
rename to discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserImpl.java
index 9fcb8b4..5acd583 100644
--- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParser.java
+++ b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserImpl.java
@@ -33,16 +33,19 @@
import javax.xml.transform.Source;
import javax.xml.transform.stream.StreamSource;
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.osgi.service.component.annotations.Component;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionType;
import org.osgi.xmlns.rsa.v1_0.EndpointDescriptionsType;
import org.osgi.xmlns.rsa.v1_0.ObjectFactory;
import org.osgi.xmlns.rsa.v1_0.PropertyType;
-public class EndpointDescriptionParser {
+@Component
+public class EndpointDescriptionParserImpl implements EndpointDescriptionParser {
private JAXBContext jaxbContext;
- public EndpointDescriptionParser() {
+ public EndpointDescriptionParserImpl() {
try {
jaxbContext = JAXBContext.newInstance(EndpointDescriptionsType.class);
} catch (JAXBException e) {
diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/Activator.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/Activator.java
deleted file mode 100644
index 3e9a187..0000000
--- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/Activator.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.local;
-
-import org.osgi.framework.BundleActivator;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.util.tracker.ServiceTracker;
-
-public class Activator implements BundleActivator {
- private ServiceTracker<EndpointEventListener, EndpointEventListener> listenerTracker;
- private LocalDiscovery localDiscovery;
-
- public void start(BundleContext context) {
- localDiscovery = new LocalDiscovery();
- listenerTracker = new EPListenerTracker(context, localDiscovery);
- listenerTracker.open();
- localDiscovery.processExistingBundles(context.getBundles());
- context.addBundleListener(localDiscovery);
- }
-
- public void stop(BundleContext context) {
- listenerTracker.close();
- context.removeBundleListener(localDiscovery);
- }
-
- private final class EPListenerTracker extends ServiceTracker<EndpointEventListener, EndpointEventListener> {
- private final LocalDiscovery localDiscovery;
-
- private EPListenerTracker(BundleContext context, LocalDiscovery localDiscovery) {
- super(context, EndpointEventListener.class, null);
- this.localDiscovery = localDiscovery;
- }
-
- @Override
- public EndpointEventListener addingService(ServiceReference<EndpointEventListener> reference) {
- EndpointEventListener service = super.addingService(reference);
- localDiscovery.addListener(reference, service);
- return service;
- }
-
- @Override
- public void modifiedService(ServiceReference<EndpointEventListener> reference, EndpointEventListener service) {
- super.modifiedService(reference, service);
- localDiscovery.removeListener(service);
-
- // This may cause duplicate registrations of remote services,
- // but that's fine and should be filtered out on another level.
- // See Remote Service Admin spec section 122.6.3
- localDiscovery.addListener(reference, service);
- }
-
- @Override
- public void removedService(ServiceReference<EndpointEventListener> reference, EndpointEventListener service) {
- super.removedService(reference, service);
- localDiscovery.removeListener(service);
- }
- }
-
-
-}
diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/EndpointDescriptionBundleParser.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/EndpointDescriptionBundleParser.java
index 1484dcb..1929b6c 100644
--- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/EndpointDescriptionBundleParser.java
+++ b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/EndpointDescriptionBundleParser.java
@@ -25,7 +25,7 @@
import java.util.Enumeration;
import java.util.List;
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParserImpl;
import org.osgi.framework.Bundle;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.slf4j.Logger;
@@ -37,10 +37,10 @@
private static final String REMOTE_SERVICES_HEADER_NAME = "Remote-Service";
private static final String REMOTE_SERVICES_DIRECTORY = "OSGI-INF/remote-service/";
- private EndpointDescriptionParser parser;
+ private EndpointDescriptionParserImpl parser;
public EndpointDescriptionBundleParser() {
- parser = new EndpointDescriptionParser();
+ parser = new EndpointDescriptionParserImpl();
}
public List<EndpointDescription> getAllEndpointDescriptions(Bundle b) {
@@ -57,7 +57,7 @@
return elements;
}
- Enumeration<URL> getEndpointDescriptionURLs(Bundle b) {
+ private Enumeration<URL> getEndpointDescriptionURLs(Bundle b) {
String origDir = getRemoteServicesDir(b);
// Split origDir into dir and file pattern
diff --git a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java
index adf98b4..22b1f2d 100644
--- a/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java
+++ b/discovery/local/src/main/java/org/apache/aries/rsa/discovery/local/LocalDiscovery.java
@@ -31,15 +31,21 @@
import org.apache.aries.rsa.util.StringPlus;
import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.BundleListener;
import org.osgi.framework.Filter;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+@Component(immediate = true)
public class LocalDiscovery implements BundleListener {
// this is effectively a set which allows for multiple service descriptions with the
@@ -53,6 +59,18 @@
public LocalDiscovery() {
this.bundleParser = new EndpointDescriptionBundleParser();
}
+
+ @Activate
+ public void activate(BundleContext context) {
+ Bundle[] bundles = context.getBundles();
+ processExistingBundles(bundles);
+ context.addBundleListener(this);
+ }
+
+ @Deactivate
+ public void deactivate(BundleContext context) {
+ context.removeBundleListener(this);
+ }
public void processExistingBundles(Bundle[] bundles) {
if (bundles == null) {
@@ -66,7 +84,8 @@
}
}
- void addListener(ServiceReference<EndpointEventListener> endpointListenerRef, EndpointEventListener endpointListener) {
+ @Reference
+ void bindListener(ServiceReference<EndpointEventListener> endpointListenerRef, EndpointEventListener endpointListener) {
List<String> filters = StringPlus.normalize(endpointListenerRef.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE));
if (filters.isEmpty()) {
return;
@@ -93,7 +112,7 @@
* itself to clean up any orphans. See Remote Service Admin spec 122.6.3
* @param endpointListener
*/
- void removeListener(EndpointEventListener endpointListener) {
+ void unbindListener(EndpointEventListener endpointListener) {
synchronized (listenerToFilters) {
Collection<String> filters = listenerToFilters.remove(endpointListener);
if (filters == null) {
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserTest.java b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserTest.java
index 12ead94..9dcefc0 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserTest.java
+++ b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/EndpointDescriptionParserTest.java
@@ -31,7 +31,7 @@
@Test
public void testEndpointDescriptionsFromURL() throws IOException {
URL ed1URL = getClass().getResource("/ed1.xml");
- List<EndpointDescription> edElements = new EndpointDescriptionParser().
+ List<EndpointDescription> edElements = new EndpointDescriptionParserImpl().
readEndpoints(ed1URL.openStream());
Assert.assertEquals(4, edElements.size());
}
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/PropertiesMapperTest.java b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/PropertiesMapperTest.java
index ffd1dbc..31fa1b7 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/PropertiesMapperTest.java
+++ b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/endpoint/PropertiesMapperTest.java
@@ -80,7 +80,7 @@
ByteArrayOutputStream bos = new ByteArrayOutputStream();
EndpointDescription epd = new EndpointDescription(m);
- new EndpointDescriptionParser().writeEndpoint(epd, bos);
+ new EndpointDescriptionParserImpl().writeEndpoint(epd, bos);
byte[] epData = bos.toByteArray();
System.out.println(new String(epData));
URL edURL = getClass().getResource("/ed2-generated.xml");
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/LocalDiscoveryTest.java b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/LocalDiscoveryTest.java
index 59b3c3f..20afdf6 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/LocalDiscoveryTest.java
+++ b/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/LocalDiscoveryTest.java
@@ -103,7 +103,7 @@
endpointListener.endpointChanged(EasyMock.anyObject(EndpointEvent.class), EasyMock.eq("(objectClass=*)"));
EasyMock.expectLastCall();
EasyMock.replay(endpointListener);
- ld.addListener(sr, endpointListener);
+ ld.bindListener(sr, endpointListener);
// Start the bundle
BundleEvent be = new BundleEvent(BundleEvent.STARTED, bundle);
@@ -148,7 +148,7 @@
// Add the EndpointListener Service
assertEquals("Precondition failed", 0, ld.listenerToFilters.size());
assertEquals("Precondition failed", 0, ld.filterToListeners.size());
- ld.addListener(sr, el);
+ ld.bindListener(sr, el);
assertEquals(1, ld.listenerToFilters.size());
assertEquals(Collections.singletonList("(objectClass=org.example.ClassA)"), ld.listenerToFilters.get(el));
@@ -175,8 +175,8 @@
}).times(2);
EasyMock.replay(el);
- ld.removeListener(el);
- ld.addListener(sr2, el);
+ ld.unbindListener(el);
+ ld.bindListener(sr2, el);
assertEquals(1, ld.listenerToFilters.size());
assertEquals(Arrays.asList("(|(objectClass=org.example.ClassA)(objectClass=org.example.ClassB))"),
ld.listenerToFilters.get(el));
@@ -189,7 +189,7 @@
assertEquals(expectedEndpoints, actualEndpoints);
// Remove the EndpointListener Service
- ld.removeListener(el);
+ ld.unbindListener(el);
assertEquals(0, ld.listenerToFilters.size());
assertEquals(0, ld.filterToListeners.size());
}
@@ -215,7 +215,7 @@
assertEquals("Precondition failed", 0, ld.listenerToFilters.size());
assertEquals("Precondition failed", 0, ld.filterToListeners.size());
- ld.addListener(sr, endpointListener);
+ ld.bindListener(sr, endpointListener);
assertEquals(1, ld.listenerToFilters.size());
assertEquals(Collections.singletonList("(objectClass=Aaaa)"), ld.listenerToFilters.get(endpointListener));
@@ -235,7 +235,7 @@
EndpointEventListener endpointListener2 = EasyMock.createMock(EndpointEventListener.class);
EasyMock.replay(endpointListener2);
- ld.addListener(sr2, endpointListener2);
+ ld.bindListener(sr2, endpointListener2);
assertEquals(2, ld.listenerToFilters.size());
assertEquals(Collections.singletonList("(objectClass=Aaaa)"), ld.listenerToFilters.get(endpointListener));
@@ -260,7 +260,7 @@
EndpointEventListener endpointListener3 = EasyMock.createMock(EndpointEventListener.class);
EasyMock.replay(endpointListener3);
- ld.addListener(sr3, endpointListener3);
+ ld.bindListener(sr3, endpointListener3);
assertEquals(3, ld.listenerToFilters.size());
assertEquals(Collections.singletonList("(objectClass=Aaaa)"), ld.listenerToFilters.get(endpointListener));
@@ -287,11 +287,11 @@
assertEquals(1, ld.listenerToFilters.size());
assertEquals(2, ld.filterToListeners.size());
assertEquals(1, ld.filterToListeners.values().iterator().next().size());
- ld.removeListener(EasyMock.createMock(EndpointEventListener.class));
+ ld.unbindListener(EasyMock.createMock(EndpointEventListener.class));
assertEquals(1, ld.listenerToFilters.size());
assertEquals(2, ld.filterToListeners.size());
assertEquals(1, ld.filterToListeners.values().iterator().next().size());
- ld.removeListener(endpointListener);
+ ld.unbindListener(endpointListener);
assertEquals(0, ld.listenerToFilters.size());
assertEquals(0, ld.filterToListeners.size());
}
diff --git a/discovery/zookeeper/bnd.bnd b/discovery/zookeeper/bnd.bnd
index 4a8cc29..4f06393 100644
--- a/discovery/zookeeper/bnd.bnd
+++ b/discovery/zookeeper/bnd.bnd
@@ -16,3 +16,4 @@
# under the License.
Provide-Capability: osgi.remoteserviceadmin.discovery;\
protocols:List<String>="zookeeper"; version:Version=1.1.0
+Export-Package: org.apache.aries.rsa.discovery.zookeeper
\ No newline at end of file
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
index 9ba3fd2..e6bcf6f 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ClientManager.java
@@ -18,14 +18,15 @@
*/
package org.apache.aries.rsa.discovery.zookeeper;
+import static java.util.concurrent.CompletableFuture.runAsync;
+
import java.io.IOException;
import java.util.Hashtable;
-import java.util.concurrent.CompletableFuture;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZooTrace;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
@@ -40,16 +41,16 @@
@Component(//
service = ClientManager.class,
immediate = true,
- configurationPid = "org.apache.aries.rsa.discovery.zookeeper", //
+ configurationPid = ClientManager.DISCOVERY_ZOOKEEPER_ID, //
configurationPolicy = ConfigurationPolicy.REQUIRE //
)
public class ClientManager implements Watcher {
-
+ public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.aries.rsa.discovery.zookeeper";
private static final Logger LOG = LoggerFactory.getLogger(ClientManager.class);
private ZooKeeper zkClient;
private DiscoveryConfig config;
- private ServiceRegistration<ZooKeeper> reg;
+ private volatile ServiceRegistration<ZooKeeper> reg;
private BundleContext context;
@Activate
@@ -81,11 +82,7 @@
if (reg != null) {
reg.unregister();
}
- CompletableFuture.runAsync(new Runnable() {
- public void run() {
- closeClient();
- }
- });
+ runAsync(this::closeClient);
}
private void closeClient() {
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterestManager.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
similarity index 63%
rename from discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterestManager.java
rename to discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
index 6e92c78..41bce86 100644
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterestManager.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/InterestManager.java
@@ -16,16 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
+package org.apache.aries.rsa.discovery.zookeeper;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
import org.apache.aries.rsa.util.StringPlus;
import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
@@ -39,10 +42,12 @@
* Events from repository are then forwarded to all interested EndpointEventListeners.
*/
@SuppressWarnings({"deprecation", "rawtypes"})
-public class InterestManager implements EndpointEventListener {
+@Component(service = InterestManager.class)
+public class InterestManager {
private static final Logger LOG = LoggerFactory.getLogger(InterestManager.class);
- private final ZookeeperEndpointRepository repository;
+ private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<>();
+
private final Map<ServiceReference, Interest> interests = new ConcurrentHashMap<>();
protected static class Interest {
@@ -50,11 +55,51 @@
Object epListener;
}
- public InterestManager(ZookeeperEndpointRepository repository) {
- this.repository = repository;
+ @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
+ public void bindEndpointEventListener(ServiceReference<EndpointEventListener> sref, EndpointEventListener epListener) {
+ addInterest(sref, epListener);
+ }
+
+ public void updatedEndpointEventListener(ServiceReference<EndpointEventListener> sref, EndpointEventListener epListener) {
+ addInterest(sref, epListener);
+ }
+
+ public void unbindEndpointEventListener(ServiceReference<EndpointEventListener> sref) {
+ removeInterest(sref);
}
- public void addInterest(ServiceReference<?> sref, Object epListener) {
+ @Reference(cardinality = ReferenceCardinality.MULTIPLE, policy = ReferencePolicy.DYNAMIC)
+ public void bindEndpointListener(ServiceReference<EndpointListener> sref, EndpointListener epListener) {
+ addInterest(sref, epListener);
+ }
+
+ public void updatedEndpointListener(ServiceReference<EndpointListener> sref, EndpointListener epListener) {
+ addInterest(sref, epListener);
+ }
+
+ public void unbindEndpointListener(ServiceReference<EndpointListener> sref) {
+ removeInterest(sref);
+ }
+
+ private void removeInterest(ServiceReference<?> sref) {
+ if (interests.containsKey(sref)) {
+ List<String> scopes = getScopes(sref);
+ LOG.info("removing interests: {}", scopes);
+ interests.remove(sref);
+ }
+ }
+
+ /**
+ * Read current endpoint stored at a znode
+ *
+ * @param path
+ * @return
+ */
+ EndpointDescription read(String path) {
+ return nodes.get(path);
+ }
+
+ private void addInterest(ServiceReference<?> sref, Object epListener) {
if (isOurOwnEndpointEventListener(sref)) {
LOG.debug("Skipping our own EndpointEventListener");
return;
@@ -71,28 +116,40 @@
interest.scopes = scopes;
interests.put(sref, interest);
sendExistingEndpoints(scopes, epListener);
+ } else {
+ interest.scopes = scopes;
+ sendExistingEndpoints(scopes, epListener);
}
}
private void sendExistingEndpoints(List<String> scopes, Object epListener) {
- for (EndpointDescription endpoint : repository.getAll()) {
+ for (EndpointDescription endpoint : nodes.values()) {
EndpointEvent event = new EndpointEvent(EndpointEvent.ADDED, endpoint);
notifyListener(event, scopes, epListener);
}
}
- private static boolean isOurOwnEndpointEventListener(ServiceReference<?> EndpointEventListener) {
+ private static boolean isOurOwnEndpointEventListener(ServiceReference<?> endpointEventListener) {
return Boolean.parseBoolean(String.valueOf(
- EndpointEventListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
+ endpointEventListener.getProperty(ClientManager.DISCOVERY_ZOOKEEPER_ID)));
+ }
+
+ public void handleRemoved(String path) {
+ EndpointDescription endpoint = nodes.remove(path);
+ if (endpoint != null) {
+ EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
+ endpointChanged(event);
+ }
}
- public void removeInterest(ServiceReference<EndpointEventListener> epListenerRef) {
- LOG.info("removing EndpointEventListener interests: {}", epListenerRef);
- interests.remove(epListenerRef);
+ public void handleChanged(String path, EndpointDescription endpoint) {
+ EndpointDescription old = nodes.put(path, endpoint);
+ int type = old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED;
+ EndpointEvent event = new EndpointEvent(type, endpoint);
+ endpointChanged(event);
}
- @Override
- public void endpointChanged(EndpointEvent event, String filter) {
+ private void endpointChanged(EndpointEvent event) {
for (Interest interest : interests.values()) {
notifyListener(event, interest.scopes, interest.epListener);
}
@@ -146,7 +203,9 @@
}
}
+ @Deactivate
public synchronized void close() {
+ nodes.clear();
interests.clear();
}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java
new file mode 100644
index 0000000..89e1556
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListener.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+import org.osgi.service.remoteserviceadmin.EndpointListener;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+/**
+ * Listens for local EndpointEvents using old and new style listeners and publishes changes to
+ * the ZooKeeperEndpointRepository
+ */
+@SuppressWarnings("deprecation")
+@Component(service = {}, immediate = true)
+public class PublishingEndpointListener implements EndpointEventListener, EndpointListener {
+
+ private ServiceRegistration<?> listenerReg;
+
+ @Reference
+ private ZookeeperEndpointPublisher repository;
+
+ @Activate
+ public void start(BundleContext bctx) {
+ String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
+ String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
+ Dictionary<String, String> props = serviceProperties(uuid);
+ listenerReg = bctx.registerService(ifAr, this, props);
+ }
+
+ @Deactivate
+ public void stop() {
+ listenerReg.unregister();
+ }
+
+ @Override
+ public void endpointChanged(EndpointEvent event, String filter) {
+ repository.endpointChanged(event);
+ }
+
+ @Override
+ public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
+ endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), matchedFilter);
+ }
+
+ @Override
+ public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
+ endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint), matchedFilter);
+ }
+
+ private Dictionary<String, String> serviceProperties(String uuid) {
+ String scope = String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS,
+ RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid);
+ Dictionary<String, String> props = new Hashtable<>();
+ props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, scope);
+ props.put(ClientManager.DISCOVERY_ZOOKEEPER_ID, "true");
+ return props;
+ }
+
+
+}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
deleted file mode 100644
index 1ca8516..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZooKeeperDiscovery.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper;
-
-import org.apache.aries.rsa.discovery.zookeeper.publish.PublishingEndpointListener;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.aries.rsa.discovery.zookeeper.subscribe.EndpointListenerTracker;
-import org.apache.aries.rsa.discovery.zookeeper.subscribe.InterestManager;
-import org.apache.zookeeper.ZooKeeper;
-import org.osgi.framework.BundleContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.util.tracker.ServiceTracker;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Component
-public class ZooKeeperDiscovery {
- public static final String DISCOVERY_ZOOKEEPER_ID = "org.apache.cxf.dosgi.discovery.zookeeper";
-
- private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDiscovery.class);
-
- @Reference
- private ZooKeeper zkClient;
-
- private PublishingEndpointListener endpointListener;
- private ServiceTracker<?, ?> endpointListenerTracker;
- private InterestManager imManager;
- private ZookeeperEndpointRepository repository;
-
- @Activate
- public void activate(BundleContext context) {
- LOG.debug("Starting ZookeeperDiscovery");
- repository = new ZookeeperEndpointRepository(zkClient);
- endpointListener = new PublishingEndpointListener(repository);
- endpointListener.start(context);
- imManager = new InterestManager(repository);
- repository.addListener(imManager);
- endpointListenerTracker = new EndpointListenerTracker(context, imManager);
- endpointListenerTracker.open();
- }
-
- @Deactivate
- public void deactivate() {
- LOG.debug("Stopping ZookeeperDiscovery");
- endpointListener.stop();
- endpointListenerTracker.close();
- imManager.close();
- repository.close();
- }
-
-}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java
new file mode 100644
index 0000000..37fdada
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointListener.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import java.io.ByteArrayInputStream;
+import java.util.List;
+
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(immediate = true)
+public class ZookeeperEndpointListener {
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointListener.class);
+
+ @Reference
+ private ZooKeeper zk;
+
+ @Reference
+ private EndpointDescriptionParser parser;
+
+ @Reference
+ private InterestManager listener;
+
+ @Reference
+ private ZookeeperEndpointPublisher publisher;
+
+ public ZookeeperEndpointListener() {
+ }
+
+ public ZookeeperEndpointListener(ZooKeeper zk, EndpointDescriptionParser parser, InterestManager listener) {
+ this.zk = zk;
+ this.parser = parser;
+ this.listener = listener;
+ activate();
+ }
+
+ @Activate
+ public void activate() {
+ watchRecursive(ZookeeperEndpointPublisher.PATH_PREFIX);
+ }
+
+ private void process(WatchedEvent event) {
+ String path = event.getPath();
+ LOG.info("Received event {}", event);
+ switch (event.getType()) {
+ case NodeCreated:
+ case NodeDataChanged:
+ case NodeChildrenChanged:
+ watchRecursive(path);
+ break;
+ case NodeDeleted:
+ listener.handleRemoved(path);
+ break;
+ default:
+ break;
+ }
+ }
+
+ private void watchRecursive(String path) {
+ LOG.info("Watching {}", path);
+ try {
+ EndpointDescription endpoint = read(path);
+ if (endpoint != null) {
+ listener.handleChanged(path, endpoint);
+ }
+ List<String> children = zk.getChildren(path, this::process);
+ if (children == null) {
+ return;
+ }
+ for (String child : children) {
+ String childPath = (path.endsWith("/") ? path : path + "/") + child;
+ watchRecursive(childPath);
+ }
+ } catch (NoNodeException | SessionExpiredException | ConnectionLossException e) {
+ // NoNodeException happens when a node was removed
+ LOG.debug(e.getMessage(), e);
+ } catch (Exception e) {
+ LOG.info(e.getMessage(), e);
+ }
+ }
+
+ EndpointDescription read(String path) throws KeeperException, InterruptedException {
+ Stat stat = new Stat();
+ byte[] data = zk.getData(path, this::process, stat);
+ if (data == null || data.length == 0) {
+ return null;
+ } else {
+ return parser.readEndpoint(new ByteArrayInputStream(data));
+ }
+ }
+
+}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java
new file mode 100644
index 0000000..7757396
--- /dev/null
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisher.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import static java.util.Arrays.asList;
+import static java.util.stream.Collectors.toList;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Component(service = ZookeeperEndpointPublisher.class)
+public class ZookeeperEndpointPublisher {
+ public static final String PATH_PREFIX = "/osgi/service_registry";
+ private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointPublisher.class);
+ private final Map<Integer, String> typeNames = new HashMap<>();
+
+ @Reference
+ private ZooKeeper zk;
+
+ @Reference
+ private EndpointDescriptionParser parser;
+
+ public ZookeeperEndpointPublisher() {
+ typeNames.put(EndpointEvent.ADDED, "added");
+ typeNames.put(EndpointEvent.MODIFIED, "modified");
+ typeNames.put(EndpointEvent.MODIFIED_ENDMATCH, "modified");
+ typeNames.put(EndpointEvent.REMOVED, "removed");
+ }
+
+ public ZookeeperEndpointPublisher(ZooKeeper zk, EndpointDescriptionParser parser) {
+ this();
+ this.zk = zk;
+ this.parser = parser;
+ }
+
+ @Activate
+ public void activate() {
+ try {
+ createPath(PATH_PREFIX);
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to create base path");
+ }
+ }
+
+ public void endpointChanged(EndpointEvent event) {
+ try {
+ EndpointDescription endpoint = event.getEndpoint();
+ switch (event.getType()) {
+ case EndpointEvent.ADDED:
+ add(endpoint);
+ break;
+ case EndpointEvent.MODIFIED:
+ case EndpointEvent.MODIFIED_ENDMATCH:
+ modify(endpoint);
+ break;
+ case EndpointEvent.REMOVED:
+ remove(endpoint);
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ logException(typeNames.get(event.getType()), event.getEndpoint(), e);
+ }
+ }
+
+ private void logException(String operation, EndpointDescription endpoint, Exception ex) {
+ String msg = String.format("Exception during %s of endpoint %s", operation, endpoint.getId());
+ LOG.error(msg, ex);
+ }
+
+
+ private void add(EndpointDescription endpoint) throws KeeperException, InterruptedException {
+ String path = getZooKeeperPath(endpoint.getId());
+ LOG.info("Exporting path: {}, Endpoint: {}", path, endpoint);
+ createBasePath();
+ byte[] data = getData(endpoint);
+ createEphemeralNode(path, data);
+ }
+
+ private void modify(EndpointDescription endpoint) throws KeeperException, InterruptedException {
+ String path = getZooKeeperPath(endpoint.getId());
+ LOG.info("Changing endpoint in zookeeper. Endpoint: {}, Path: {}", endpoint, path);
+ createBasePath();
+ zk.setData(path, getData(endpoint), -1);
+ }
+
+ private void remove(EndpointDescription endpoint) throws KeeperException, InterruptedException {
+ String path = getZooKeeperPath(endpoint.getId());
+ LOG.info("Removing endpoint in zookeeper. Endpoint: {}, Path: {}", endpoint, path);
+ zk.delete(path, -1);
+ }
+
+ private boolean notEmpty(String part) {
+ return part != null && !part.isEmpty();
+ }
+
+ static String getZooKeeperPath(String name) {
+ String escaped = name.replace('/', '#');
+ return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + escaped;
+ }
+
+ private void createBasePath() throws KeeperException, InterruptedException {
+ String path = ZookeeperEndpointPublisher.getZooKeeperPath("");
+ createPath(path);
+ }
+
+ private byte[] getData(EndpointDescription epd) {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ parser.writeEndpoint(epd, bos);
+ return bos.toByteArray();
+ }
+
+ private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
+ try {
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ } catch (NodeExistsException nee) {
+ // this sometimes happens after a ZooKeeper node dies and the ephemeral node
+ // that belonged to the old session was not yet deleted. We need to make our
+ // session the owner of the node so it won't get deleted automatically -
+ // we do this by deleting and recreating it ourselves.
+ LOG.info("node for endpoint already exists, recreating: {}", fullPath);
+ try {
+ zk.delete(fullPath, -1);
+ } catch (NoNodeException nne) {
+ // it's a race condition, but as long as it got deleted - it's ok
+ }
+ zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ }
+
+ private void createPath(String path) throws KeeperException, InterruptedException {
+ List<String> parts = asList(path.split("/")).stream()
+ .filter(this::notEmpty)
+ .collect(toList());
+ StringBuilder current = new StringBuilder();
+ for (String part : parts) {
+ current.append('/');
+ current.append(part);
+ createNode(current.toString());
+ }
+ }
+
+ private void createNode(String path) throws KeeperException, InterruptedException {
+ try {
+ if (zk.exists(path, false) == null) {
+ zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ } catch (NodeExistsException nee) {
+ // it's not the first node with this path to ever exist - that's normal
+ }
+ }
+
+}
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/package-info.java
similarity index 76%
rename from discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
rename to discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/package-info.java
index 478e41b..1e835fe 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
+++ b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/package-info.java
@@ -16,12 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.aries.rsa.discovery.local;
-
-import junit.framework.TestCase;
-
-public class ActivatorTest extends TestCase {
-
- public void testActivator() throws Exception {
- }
-}
+@org.osgi.annotation.bundle.Capability( //
+ namespace = "osgi.remoteserviceadmin.discovery", //
+ attribute = {"protocols:List<String>=zookeeper"}, //
+ version = "1.1.0"
+)
+package org.apache.aries.rsa.discovery.zookeeper;
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
deleted file mode 100644
index 7f352a7..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListener.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import java.util.Dictionary;
-import java.util.Hashtable;
-
-import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceRegistration;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Listens for local EndpointEvents using old and new style listeners and publishes changes to
- * the ZooKeeperEndpointRepository
- */
-@SuppressWarnings("deprecation")
-public class PublishingEndpointListener implements EndpointEventListener, EndpointListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(PublishingEndpointListener.class);
-
- private ServiceRegistration<?> listenerReg;
- private ZookeeperEndpointRepository repository;
-
- public PublishingEndpointListener(ZookeeperEndpointRepository repository) {
- this.repository = repository;
- }
-
- public void start(BundleContext bctx) {
- Dictionary<String, String> props = new Hashtable<>();
- String uuid = bctx.getProperty(Constants.FRAMEWORK_UUID);
- props.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE,
- String.format("(&(%s=*)(%s=%s))", Constants.OBJECTCLASS,
- RemoteConstants.ENDPOINT_FRAMEWORK_UUID, uuid));
- props.put(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID, "true");
- String[] ifAr = {EndpointEventListener.class.getName(), EndpointListener.class.getName()};
- listenerReg = bctx.registerService(ifAr, this, props);
- }
-
- public void stop() {
- if (listenerReg != null) {
- listenerReg.unregister();
- listenerReg = null;
- }
- }
-
- @Override
- public void endpointChanged(EndpointEvent event, String filter) {
- EndpointDescription endpoint = event.getEndpoint();
- switch (event.getType()) {
- case EndpointEvent.ADDED:
- endpointAdded(endpoint, filter);
- break;
- case EndpointEvent.REMOVED:
- endpointRemoved(endpoint, filter);
- break;
- case EndpointEvent.MODIFIED:
- endpointModified(endpoint, filter);
- break;
- }
- }
-
- private void endpointModified(EndpointDescription endpoint, String filter) {
- try {
- repository.modify(endpoint);
- } catch (Exception ex) {
- logException("modification", endpoint, ex);
- }
- }
-
- @Override
- public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
- try {
- repository.add(endpoint);
- } catch (Exception ex) {
- logException("adding", endpoint, ex);
- }
- }
-
- @Override
- public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
- try {
- repository.remove(endpoint);
- } catch (Exception ex) {
- logException("removal", endpoint, ex);
- }
- }
-
- private void logException(String operation, EndpointDescription endpoint, Exception ex) {
- String msg = String.format("Exception during %s of endpoint %s", operation, endpoint.getId());
- LOG.error(msg, ex);
- }
-
-}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
deleted file mode 100644
index 5b0d66b..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepository.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.repository;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZookeeperEndpointRepository implements Closeable, Watcher {
- private static final Logger LOG = LoggerFactory.getLogger(ZookeeperEndpointRepository.class);
- private final ZooKeeper zk;
- private final EndpointDescriptionParser parser;
- private EndpointEventListener listener;
- public static final String PATH_PREFIX = "/osgi/service_registry";
-
- private Map<String, EndpointDescription> nodes = new ConcurrentHashMap<>();
-
- public ZookeeperEndpointRepository(ZooKeeper zk) {
- this.zk = zk;
- this.parser = new EndpointDescriptionParser();
- try {
- createPath(PATH_PREFIX);
- } catch (Exception e) {
- throw new IllegalStateException("Unable to create base path");
- }
- this.registerWatcher();
- }
-
- public void addListener(EndpointEventListener listener) {
- this.listener = listener;
- }
-
- /**
- * Read current endpoint stored at a znode
- *
- * @param path
- * @return
- */
- public EndpointDescription read(String path) {
- return nodes.get(path);
- }
-
- public void add(EndpointDescription endpoint) throws KeeperException, InterruptedException {
- String path = getZooKeeperPath(endpoint.getId());
- LOG.info("Exporting endpoint to zookeeper. Endpoint: {}, Path: {}", endpoint, path);
- createBasePath();
- createEphemeralNode(path, getData(endpoint));
- }
-
- public void modify(EndpointDescription endpoint) throws KeeperException, InterruptedException {
- String path = getZooKeeperPath(endpoint.getId());
- LOG.info("Changing endpoint in zookeeper. Endpoint: {}, Path: {}", endpoint, path);
- createBasePath();
- zk.setData(path, getData(endpoint), -1);
- }
-
- public void remove(EndpointDescription endpoint) throws InterruptedException, KeeperException {
- String path = getZooKeeperPath(endpoint.getId());
- LOG.info("Removing endpoint in zookeeper. Endpoint: {}, Path: {}", endpoint, path);
- zk.delete(path, -1);
- }
-
- public Collection<EndpointDescription> getAll() {
- return nodes.values();
- }
-
- /**
- * Removes nulls and empty strings from the given string array.
- *
- * @param strings an array of strings
- * @return a new array containing the non-null and non-empty
- * elements of the original array in the same order
- */
- public static List<String> removeEmpty(List<String> strings) {
- List<String> result = new ArrayList<>();
- if (strings == null) {
- return result;
- }
- for (String s : strings) {
- if (s != null && !s.isEmpty()) {
- result.add(s);
- }
- }
- return result;
- }
-
- public static String getZooKeeperPath(String name) {
- String escaped = name.replace('/', '#');
- return name == null || name.isEmpty() ? PATH_PREFIX : PATH_PREFIX + '/' + escaped;
- }
-
- @Override
- public void process(WatchedEvent event) {
- LOG.info("Received event {}", event);
- switch (event.getType()) {
- case NodeCreated:
- case NodeDataChanged:
- case NodeChildrenChanged:
- watchRecursive(event.getPath());
- break;
- case NodeDeleted:
- handleRemoved(event.getPath());
- break;
- default:
- break;
- }
- }
-
- @Override
- public void close() {
- nodes.clear();
- }
-
- private void createBasePath() throws KeeperException, InterruptedException {
- String path = ZookeeperEndpointRepository.getZooKeeperPath("");
- createPath(path);
- }
-
- private void registerWatcher() {
- try {
- watchRecursive(ZookeeperEndpointRepository.PATH_PREFIX);
- } catch (Exception e) {
- LOG.info(e.getMessage(), e);
- }
- }
-
- /**
- * TODO Check if we handle connection losses correctly
- * @param path
- */
- private void watchRecursive(String path) {
- LOG.info("Watching {}", path);
- try {
- handleZNodeChanged(path);
- List<String> children = zk.getChildren(path, this);
- if (children == null) {
- return;
- }
- for (String child : children) {
- String childPath = (path.endsWith("/") ? path : path + "/") + child;
- watchRecursive(childPath);
- }
- } catch (NoNodeException | SessionExpiredException | ConnectionLossException e) {
- // NoNodeException happens when a node was removed
- LOG.debug(e.getMessage(), e);
- } catch (Exception e) {
- LOG.info(e.getMessage(), e);
- }
- }
-
- private byte[] getData(EndpointDescription epd) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- parser.writeEndpoint(epd, bos);
- return bos.toByteArray();
- }
-
- private void createEphemeralNode(String fullPath, byte[] data) throws KeeperException, InterruptedException {
- try {
- zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- } catch (NodeExistsException nee) {
- // this sometimes happens after a ZooKeeper node dies and the ephemeral node
- // that belonged to the old session was not yet deleted. We need to make our
- // session the owner of the node so it won't get deleted automatically -
- // we do this by deleting and recreating it ourselves.
- LOG.info("node for endpoint already exists, recreating: {}", fullPath);
- try {
- zk.delete(fullPath, -1);
- } catch (NoNodeException nne) {
- // it's a race condition, but as long as it got deleted - it's ok
- }
- zk.create(fullPath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
- }
- }
-
- private void createPath(String path) throws KeeperException, InterruptedException {
- StringBuilder current = new StringBuilder();
- List<String> parts = ZookeeperEndpointRepository.removeEmpty(Arrays.asList(path.split("/")));
- for (String part : parts) {
- current.append('/');
- current.append(part);
- try {
- if (zk.exists(current.toString(), false) == null) {
- zk.create(current.toString(), new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- } catch (NodeExistsException nee) {
- // it's not the first node with this path to ever exist - that's normal
- }
- }
- }
-
- private void handleZNodeChanged(String path) throws KeeperException, InterruptedException {
- Stat stat = new Stat();
- byte[] data = zk.getData(path, this, stat);
- if (data == null || data.length == 0) {
- return;
- }
- EndpointDescription endpoint = parser.readEndpoint(new ByteArrayInputStream(data));
- if (endpoint != null) {
- handleChanged(path, endpoint);
- }
- }
-
- private void handleRemoved(String path) {
- EndpointDescription endpoint = nodes.remove(path);
- EndpointEvent event = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
- if (listener != null) {
- listener.endpointChanged(event, null);
- }
- }
-
- private void handleChanged(String path, EndpointDescription endpoint) {
- EndpointDescription old = nodes.put(path, endpoint);
- int type = old == null ? EndpointEvent.ADDED : EndpointEvent.MODIFIED;
- EndpointEvent event = new EndpointEvent(type, endpoint);
- if (listener != null) {
- listener.endpointChanged(event, null);
- }
- }
-
-}
diff --git a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java b/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
deleted file mode 100644
index 26910cd..0000000
--- a/discovery/zookeeper/src/main/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/EndpointListenerTracker.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.Filter;
-import org.osgi.framework.FrameworkUtil;
-import org.osgi.framework.InvalidSyntaxException;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-import org.osgi.service.remoteserviceadmin.EndpointListener;
-import org.osgi.util.tracker.ServiceTracker;
-
-/**
- * Tracks EndpointListeners and EndpointEventListeners. Delegates to InterestManager to handle them
- */
-@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
-public class EndpointListenerTracker extends ServiceTracker {
- private final InterestManager imManager;
-
- public EndpointListenerTracker(BundleContext bctx, InterestManager imManager) {
- super(bctx, getFilter(), null);
- this.imManager = imManager;
- }
-
- private static Filter getFilter() {
- String filterSt = String.format("(|(objectClass=%s)(objectClass=%s))", EndpointEventListener.class.getName(),
- EndpointListener.class.getName());
- try {
- return FrameworkUtil.createFilter(filterSt);
- } catch (InvalidSyntaxException e) {
- throw new IllegalArgumentException(e.getMessage(), e);
- }
- }
-
- @Override
- public Object addingService(ServiceReference sref) {
- Object epListener = super.addingService(sref);
- imManager.addInterest(sref, epListener);
- return epListener;
- }
-
- @Override
- public void modifiedService(ServiceReference sref, Object epListener) {
- // called when an EndpointListener updates its service properties,
- // e.g. when its interest scope is expanded/reduced
- imManager.addInterest(sref, epListener);
- }
-
- @Override
- public void removedService(ServiceReference sref, Object epListener) {
- imManager.removeInterest(sref);
- super.removedService(sref, epListener);
- }
-
-}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java
new file mode 100644
index 0000000..985f1b0
--- /dev/null
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/InterestManagerTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+import org.easymock.EasyMock;
+import org.easymock.IMocksControl;
+import org.junit.Test;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.remoteserviceadmin.EndpointEventListener;
+
+public class InterestManagerTest {
+
+ @Test
+ public void testEndpointListenerTrackerCustomizer() {
+ IMocksControl c = EasyMock.createControl();
+ ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)");
+ ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)");
+ EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class);
+ EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class);
+
+ c.replay();
+
+ InterestManager im = new InterestManager();
+ // sref has no scope -> nothing should happen
+ assertEquals(0, im.getInterests().size());
+
+ im.bindEndpointEventListener(sref, epListener1);
+ assertEquals(1, im.getInterests().size());
+
+ im.bindEndpointEventListener(sref, epListener1);
+ assertEquals(1, im.getInterests().size());
+
+ im.bindEndpointEventListener(sref2, epListener2);
+ assertEquals(2, im.getInterests().size());
+
+ im.unbindEndpointEventListener(sref);
+ assertEquals(1, im.getInterests().size());
+
+ im.unbindEndpointEventListener(sref);
+ assertEquals(1, im.getInterests().size());
+
+ im.unbindEndpointEventListener(sref2);
+ assertEquals(0, im.getInterests().size());
+
+ c.verify();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) {
+ ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class);
+ expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce();
+ expect(sref.getProperty(ClientManager.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce();
+ return sref;
+ }
+
+}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java
new file mode 100644
index 0000000..815c42a
--- /dev/null
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/PublishingEndpointListenerTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.discovery.zookeeper;
+
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.osgi.framework.Constants;
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.EndpointEvent;
+import org.osgi.service.remoteserviceadmin.RemoteConstants;
+
+@RunWith(MockitoJUnitRunner.class)
+public class PublishingEndpointListenerTest {
+ @Mock
+ ZookeeperEndpointPublisher repository;
+
+ @InjectMocks
+ PublishingEndpointListener eli;
+
+ @Test
+ public void testEndpointRemovalAdding() throws KeeperException, InterruptedException {
+ EndpointDescription endpoint = createEndpoint();
+ EndpointEvent event1 = new EndpointEvent(EndpointEvent.ADDED, endpoint);
+ eli.endpointChanged(event1, null);
+ EndpointEvent event2 = new EndpointEvent(EndpointEvent.REMOVED, endpoint);
+ eli.endpointChanged(event2, null);
+
+ verify(repository).endpointChanged(event1);
+ verify(repository).endpointChanged(event2);
+ }
+
+ private EndpointDescription createEndpoint() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(Constants.OBJECTCLASS, new String[] {"myClass"});
+ props.put(RemoteConstants.ENDPOINT_ID, "http://google.de:80/test/sub");
+ props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "myConfig");
+ return new EndpointDescription(props);
+ }
+}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
similarity index 66%
rename from discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
rename to discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
index a65696b..02f6093 100644
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/repository/ZookeeperEndpointRepositoryTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperDiscoveryTest.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.aries.rsa.discovery.zookeeper.repository;
+package org.apache.aries.rsa.discovery.zookeeper;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -35,9 +35,10 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParserImpl;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
@@ -45,46 +46,35 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointEvent;
import org.osgi.service.remoteserviceadmin.EndpointEventListener;
import org.osgi.service.remoteserviceadmin.RemoteConstants;
-public class ZookeeperEndpointRepositoryTest {
-
+@RunWith(MockitoJUnitRunner.class)
+public class ZookeeperDiscoveryTest {
+ final Semaphore semConnected = new Semaphore(0);
+ final Semaphore sem = new Semaphore(0);
private ZooKeeperServer server;
private ZooKeeper zk;
private ServerCnxnFactory factory;
private List<EndpointEvent> events = new ArrayList<>();
+ @Mock
+ private ServiceReference<EndpointEventListener> sref;
@Before
public void before() throws IOException, InterruptedException, KeeperException {
- File target = new File("target");
- File zookeeperDir = new File(target, "zookeeper");
- server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000);
- factory = new NIOServerCnxnFactory();
- int clientPort = getClientPort();
- factory.configure(new InetSocketAddress(clientPort), 10);
- factory.startup(server);
- Watcher watcher = new Watcher() {
-
- @Override
- public void process(WatchedEvent event) {
- System.out.println(event);
- }
-
- };
- zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, watcher);
+ startZookeeperServer();
+ zk = new ZooKeeper("localhost:" + server.getClientPort(), 1000, this::process);
printNodes("/");
}
- private int getClientPort() throws IOException {
- try (ServerSocket serverSocket = new ServerSocket(0)) {
- return serverSocket.getLocalPort();
- }
- }
-
@After
public void after() throws InterruptedException {
//zk.close(); // Seems to cause SessionTimeout error
@@ -93,44 +83,62 @@
@Test
public void test() throws IOException, URISyntaxException, KeeperException, InterruptedException {
- final Semaphore sem = new Semaphore(0);
- EndpointEventListener listener = new EndpointEventListener() {
-
- @Override
- public void endpointChanged(EndpointEvent event, String filter) {
- events.add(event);
- sem.release();
- }
- };
- ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk);
- repository.addListener(listener);
+ EndpointDescriptionParserImpl parser = new EndpointDescriptionParserImpl();
+ ZookeeperEndpointPublisher repository = new ZookeeperEndpointPublisher(zk, parser);
+ repository.activate();
+ InterestManager im = new InterestManager();
+
+ String scope = "("+ Constants.OBJECTCLASS +"=*)";
+ Mockito.when(sref.getProperty(Mockito.eq(EndpointEventListener.ENDPOINT_LISTENER_SCOPE))).thenReturn(scope);
+ im.bindEndpointEventListener(sref, this::onEndpointChanged);
+ ZookeeperEndpointListener zklistener = new ZookeeperEndpointListener(zk, parser, im);
+
+ assertThat(semConnected.tryAcquire(1, SECONDS), equalTo(true));
EndpointDescription endpoint = createEndpoint();
- repository.add(endpoint);
+ repository.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint));
- assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
-
+ assertThat(sem.tryAcquire(100, SECONDS), equalTo(true));
+
String path = "/osgi/service_registry/http:##test.de#service1";
- EndpointDescription ep2 = repository.read(path);
+ EndpointDescription ep2 = zklistener.read(path);
assertNotNull(ep2);
- repository.remove(endpoint);
-
+ repository.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint));
+
assertThat(sem.tryAcquire(1000, TimeUnit.SECONDS), equalTo(true));
assertThat(events.get(0).getType(), equalTo(EndpointEvent.ADDED));
assertThat(events.get(1).getType(), equalTo(EndpointEvent.REMOVED));
assertThat(events.get(0).getEndpoint(), equalTo(endpoint));
assertThat(events.get(1).getEndpoint(), equalTo(endpoint));
-
- repository.close();
+ }
+
+ private void process(WatchedEvent event) {
+ if (event.getState() == KeeperState.SyncConnected) {
+ semConnected.release();
+ }
+ System.out.println(event);
+ }
+
+ private void startZookeeperServer() throws IOException, InterruptedException {
+ File target = new File("target");
+ File zookeeperDir = new File(target, "zookeeper");
+ server = new ZooKeeperServer(zookeeperDir, zookeeperDir, 2000);
+ factory = new NIOServerCnxnFactory();
+ int clientPort = getClientPort();
+ factory.configure(new InetSocketAddress(clientPort), 10);
+ factory.startup(server);
}
- @Test
- public void testGetZooKeeperPath() {
- assertEquals(ZookeeperEndpointRepository.PATH_PREFIX + '/' + "http:##org.example.Test",
- ZookeeperEndpointRepository.getZooKeeperPath("http://org.example.Test"));
+ private int getClientPort() throws IOException {
+ try (ServerSocket serverSocket = new ServerSocket(0)) {
+ return serverSocket.getLocalPort();
+ }
+ }
- assertEquals(ZookeeperEndpointRepository.PATH_PREFIX, ZookeeperEndpointRepository.getZooKeeperPath(""));
+ private void onEndpointChanged(EndpointEvent event, String filter) {
+ events.add(event);
+ sem.release();
}
private EndpointDescription createEndpoint() {
@@ -143,7 +151,7 @@
return endpoint;
}
- public void printNodes(String path) throws KeeperException, InterruptedException {
+ private void printNodes(String path) throws KeeperException, InterruptedException {
List<String> children = zk.getChildren(path, false);
for (String child : children) {
String newPath = path.endsWith("/") ? path : path + "/";
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
similarity index 60%
copy from discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
copy to discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
index 478e41b..31441d3 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
+++ b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/ZookeeperEndpointPublisherPathTest.java
@@ -16,12 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.aries.rsa.discovery.local;
+package org.apache.aries.rsa.discovery.zookeeper;
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
-public class ActivatorTest extends TestCase {
+import org.junit.Test;
- public void testActivator() throws Exception {
+public class ZookeeperEndpointPublisherPathTest {
+
+ @Test
+ public void testGetZooKeeperPath() {
+ assertEquals(ZookeeperEndpointPublisher.PATH_PREFIX + '/' + "http:##org.example.Test",
+ ZookeeperEndpointPublisher.getZooKeeperPath("http://org.example.Test"));
+
+ assertEquals(ZookeeperEndpointPublisher.PATH_PREFIX, ZookeeperEndpointPublisher.getZooKeeperPath(""));
}
+
}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
deleted file mode 100644
index 3884c95..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/publish/PublishingEndpointListenerTest.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.publish;
-
-import static org.easymock.EasyMock.expect;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.osgi.framework.Constants;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEvent;
-import org.osgi.service.remoteserviceadmin.RemoteConstants;
-
-import junit.framework.TestCase;
-
-public class PublishingEndpointListenerTest extends TestCase {
-
- private static final String ENDPOINT_PATH = "/osgi/service_registry/http:##google.de:80#test#sub";
-
- public void testEndpointRemovalAdding() throws KeeperException, InterruptedException {
- IMocksControl c = EasyMock.createNiceControl();
-
- ZooKeeper zk = c.createMock(ZooKeeper.class);
-
- String path = ENDPOINT_PATH;
- expectCreated(zk, path);
- expectDeleted(zk, path);
-
- c.replay();
-
- ZookeeperEndpointRepository repository = new ZookeeperEndpointRepository(zk);
- PublishingEndpointListener eli = new PublishingEndpointListener(repository);
- EndpointDescription endpoint = createEndpoint();
- eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null);
- eli.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint), null); // should do nothing
- eli.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint), null);
- eli.endpointChanged(new EndpointEvent(EndpointEvent.REMOVED, endpoint), null); // should do nothing
-
- c.verify();
- }
-
- private void expectCreated(ZooKeeper zk, String path) throws KeeperException, InterruptedException {
- expect(zk.create(EasyMock.eq(path),
- (byte[])EasyMock.anyObject(),
- EasyMock.eq(Ids.OPEN_ACL_UNSAFE),
- EasyMock.eq(CreateMode.EPHEMERAL)))
- .andReturn("");
- }
-
- private void expectDeleted(ZooKeeper zk, String path) throws InterruptedException, KeeperException {
- zk.delete(EasyMock.eq(path), EasyMock.eq(-1));
- EasyMock.expectLastCall().once();
- }
-
- private EndpointDescription createEndpoint() {
- Map<String, Object> props = new HashMap<>();
- props.put(Constants.OBJECTCLASS, new String[] {"myClass"});
- props.put(RemoteConstants.ENDPOINT_ID, "http://google.de:80/test/sub");
- props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "myConfig");
- return new EndpointDescription(props);
- }
-}
diff --git a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java b/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
deleted file mode 100644
index 79b2d8d..0000000
--- a/discovery/zookeeper/src/test/java/org/apache/aries/rsa/discovery/zookeeper/subscribe/InterfaceMonitorManagerTest.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.aries.rsa.discovery.zookeeper.subscribe;
-
-import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.aries.rsa.discovery.zookeeper.ZooKeeperDiscovery;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
-import org.easymock.EasyMock;
-import org.easymock.IMocksControl;
-import org.junit.Test;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.remoteserviceadmin.EndpointDescription;
-import org.osgi.service.remoteserviceadmin.EndpointEventListener;
-
-public class InterfaceMonitorManagerTest {
-
- @Test
- public void testEndpointListenerTrackerCustomizer() {
- IMocksControl c = EasyMock.createControl();
- ServiceReference<EndpointEventListener> sref = createService(c, "(objectClass=mine)");
- ServiceReference<EndpointEventListener> sref2 = createService(c, "(objectClass=mine)");
- ZookeeperEndpointRepository repository = c.createMock(ZookeeperEndpointRepository.class);
- List<EndpointDescription> endpoints = new ArrayList<>();
- expect(repository.getAll()).andReturn(endpoints).atLeastOnce();
- EndpointEventListener epListener1 = c.createMock(EndpointEventListener.class);
- EndpointEventListener epListener2 = c.createMock(EndpointEventListener.class);
-
- c.replay();
-
- InterestManager eltc = new InterestManager(repository);
- // sref has no scope -> nothing should happen
- assertEquals(0, eltc.getInterests().size());
-
-
- eltc.addInterest(sref, epListener1);
- assertEquals(1, eltc.getInterests().size());
-
- eltc.addInterest(sref, epListener1);
- assertEquals(1, eltc.getInterests().size());
-
- eltc.addInterest(sref2, epListener2);
- assertEquals(2, eltc.getInterests().size());
-
- eltc.removeInterest(sref);
- assertEquals(1, eltc.getInterests().size());
-
- eltc.removeInterest(sref);
- assertEquals(1, eltc.getInterests().size());
-
- eltc.removeInterest(sref2);
- assertEquals(0, eltc.getInterests().size());
-
- c.verify();
- }
-
- @SuppressWarnings("unchecked")
- private ServiceReference<EndpointEventListener> createService(IMocksControl c, String scope) {
- ServiceReference<EndpointEventListener> sref = c.createMock(ServiceReference.class);
- expect(sref.getProperty(EndpointEventListener.ENDPOINT_LISTENER_SCOPE)).andReturn(scope).atLeastOnce();
- expect(sref.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)).andReturn(null).atLeastOnce();
- return sref;
- }
-
-}
diff --git a/discovery/zookeeper/src/test/resources/logback.xml b/discovery/zookeeper/src/test/resources/logback.xml
new file mode 100644
index 0000000..7f2cb48
--- /dev/null
+++ b/discovery/zookeeper/src/test/resources/logback.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<configuration>
+ <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%date %level %logger:%line %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <logger name="org.apache.zookeeper" level="WARN" />
+
+ <root level="info">
+ <appender-ref ref="console"/>
+ </root>
+</configuration>
\ No newline at end of file
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
index 0b6aadc..a24b140 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryExport.java
@@ -18,20 +18,20 @@
*/
package org.apache.aries.rsa.itests.felix.tcp;
+import static org.awaitility.Awaitility.await;
+
import java.io.ByteArrayInputStream;
-import java.util.List;
import javax.inject.Inject;
-import org.apache.aries.rsa.discovery.endpoint.EndpointDescriptionParser;
+import org.apache.aries.rsa.discovery.zookeeper.ZookeeperEndpointPublisher;
import org.apache.aries.rsa.examples.echotcp.api.EchoService;
import org.apache.aries.rsa.itests.felix.RsaTestBase;
import org.apache.aries.rsa.spi.DistributionProvider;
+import org.apache.aries.rsa.spi.EndpointDescriptionParser;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
+import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,10 +42,14 @@
@RunWith(PaxExam.class)
public class TestDiscoveryExport extends RsaTestBase {
- private static final String GREETER_ZOOKEEPER_NODE = "/osgi/service_registry";
-
@Inject
DistributionProvider tcpProvider;
+
+ @Inject
+ EndpointDescriptionParser parser;
+
+ @Inject
+ ZooKeeper zookeeper;
@Configuration
public static Option[] configure() throws Exception {
@@ -70,44 +74,21 @@
}
private EndpointDescription getEndpoint() throws Exception {
- ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 1000, new DummyWatcher());
- assertNodeExists(zk, GREETER_ZOOKEEPER_NODE, 10000);
- String endpointPath = getEndpointPath(zk, GREETER_ZOOKEEPER_NODE);
- EndpointDescription epd = getEndpointDescription(zk, endpointPath);
- zk.close();
- return epd;
+ String endpointName = await("Node exists").until(this::getEndpointPath, Matchers.notNullValue());
+ return getEndpointDescription(zookeeper, ZookeeperEndpointPublisher.PATH_PREFIX + "/" + endpointName);
}
private EndpointDescription getEndpointDescription(ZooKeeper zk, String endpointPath)
throws KeeperException, InterruptedException {
byte[] data = zk.getData(endpointPath, false, null);
ByteArrayInputStream is = new ByteArrayInputStream(data);
- return new EndpointDescriptionParser().readEndpoint(is);
+ return parser.readEndpoint(is);
}
- private String getEndpointPath(ZooKeeper zk, String servicePath) throws KeeperException, InterruptedException {
- List<String> children = zk.getChildren(servicePath, false);
- return servicePath + "/" + children.iterator().next();
- }
-
- private void assertNodeExists(ZooKeeper zk, String zNode, int timeout) {
- long endTime = System.currentTimeMillis() + timeout;
- Stat stat = null;
- while (stat == null && System.currentTimeMillis() < endTime) {
- try {
- stat = zk.exists(zNode, null);
- Thread.sleep(200);
- } catch (Exception e) {
- // Ignore
- }
- }
- Assert.assertNotNull("ZooKeeper node " + zNode + " was not found", stat);
- }
-
- private final class DummyWatcher implements Watcher {
- @Override
- public void process(WatchedEvent event) {
- }
+ private String getEndpointPath() throws KeeperException, InterruptedException {
+ return zookeeper.getChildren(ZookeeperEndpointPublisher.PATH_PREFIX, false).stream()
+ .findFirst()
+ .orElse(null);
}
}
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
index e735f55..4aea83f 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestDiscoveryImport.java
@@ -31,9 +31,8 @@
import javax.inject.Inject;
-import org.apache.aries.rsa.discovery.zookeeper.repository.ZookeeperEndpointRepository;
+import org.apache.aries.rsa.discovery.zookeeper.ZookeeperEndpointPublisher;
import org.apache.aries.rsa.itests.felix.RsaTestBase;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
@@ -49,10 +48,14 @@
@RunWith(PaxExam.class)
public class TestDiscoveryImport extends RsaTestBase {
@Inject
- ZookeeperEndpointRepository repository;
+ ZookeeperEndpointPublisher publisher;
@Inject
BundleContext context;
+
+ private Semaphore sem = new Semaphore(0);;
+
+ private List<EndpointEvent> events = new ArrayList<>();
@Configuration
public static Option[] configure() throws Exception {
@@ -66,30 +69,32 @@
};
}
- @Ignore
@Test
public void testDiscoveryImport() throws Exception {
- final Semaphore sem = new Semaphore(0);
- final List<EndpointEvent> events = new ArrayList<>();
- EndpointEventListener listener = new EndpointEventListener() {
-
- @Override
- public void endpointChanged(EndpointEvent event, String filter) {
- events.add(event);
- sem.release();
- }
- };
+ context.registerService(EndpointEventListener.class, this::endpointChanged, listenerProps());
+ EndpointDescription endpoint = createEndpoint();
+ publisher.endpointChanged(new EndpointEvent(EndpointEvent.ADDED, endpoint));
+ assertTrue(sem.tryAcquire(10, TimeUnit.SECONDS));
+ //assertThat(events.get(0), samePropertyValuesAs(new EndpointEvent(EndpointEvent.ADDED, endpoint)));
+ }
+
+ private Dictionary<String, Object> listenerProps() {
Dictionary<String, Object> eprops = new Hashtable<>();
eprops.put(EndpointEventListener.ENDPOINT_LISTENER_SCOPE, "(objectClass=*)");
- context.registerService(EndpointEventListener.class, listener, eprops);
+ return eprops;
+ }
+
+ private EndpointDescription createEndpoint() {
Map<String, Object> props = new HashMap<>();
props.put(Constants.OBJECTCLASS, new String[]{"my"});
props.put(RemoteConstants.ENDPOINT_ID, "myid");
props.put(RemoteConstants.SERVICE_IMPORTED_CONFIGS, "myconfig");
EndpointDescription endpoint = new EndpointDescription(props);
- repository.add(endpoint);
- assertTrue(sem.tryAcquire(10, TimeUnit.SECONDS));
- //assertThat(events.get(0), samePropertyValuesAs(new EndpointEvent(EndpointEvent.ADDED, endpoint)));
+ return endpoint;
}
+ public void endpointChanged(EndpointEvent event, String filter) {
+ events.add(event);
+ sem.release();
+ }
}
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
index 78598a9..09aa6fd 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestFindHook.java
@@ -19,10 +19,10 @@
*/
+import static org.awaitility.Awaitility.await;
+
import java.io.IOException;
import java.util.Collection;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
@@ -30,12 +30,13 @@
import org.apache.aries.rsa.itests.felix.RsaTestBase;
import org.apache.aries.rsa.itests.felix.ServerConfiguration;
import org.apache.aries.rsa.itests.felix.TwoContainerPaxExam;
-import org.junit.Assert;
+import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.ops4j.pax.exam.Configuration;
import org.ops4j.pax.exam.Option;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceReference;
@RunWith(TwoContainerPaxExam.class)
@@ -66,48 +67,14 @@
configZKDiscovery()
};
}
-
- public <T> T tryTo(String message, Callable<T> func) throws TimeoutException {
- return tryTo(message, func, 5000);
- }
-
- public <T> T tryTo(String message, Callable<T> func, long timeout) throws TimeoutException {
- Throwable lastException = null;
- long startTime = System.currentTimeMillis();
- while (System.currentTimeMillis() - startTime < timeout) {
- try {
- T result = func.call();
- if (result != null) {
- return result;
- }
- lastException = null;
- } catch (Throwable e) {
- lastException = e;
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- continue;
- }
- }
- TimeoutException ex = new TimeoutException("Timeout while trying to " + message);
- if (lastException != null) {
- ex.addSuppressed(lastException);
- }
- throw ex;
- }
@Test
public void testFind() throws Exception {
- ServiceReference<EchoService> ref = tryTo("get EchoService", new Callable<ServiceReference<EchoService>>() {
+ await().until(() -> getEchoServices().size(), Matchers.equalTo(1));
+ }
- @Override
- public ServiceReference<EchoService> call() throws Exception {
- Collection<ServiceReference<EchoService>> refs = context.getServiceReferences(EchoService.class, null);
- return (refs.size() > 0)? refs.iterator().next() : null;
- }
- }, 10000);
- Assert.assertNotNull(ref);
+ private Collection<ServiceReference<EchoService>> getEchoServices() throws InvalidSyntaxException {
+ return context.getServiceReferences(EchoService.class, null);
}
}
diff --git a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
index 2a038b8..7641219 100644
--- a/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
+++ b/itests/felix/src/test/java/org/apache/aries/rsa/itests/felix/tcp/TestRoundTrip.java
@@ -58,7 +58,7 @@
rsaCore(),
rsaDiscoveryZookeeper(),
rsaProviderTcp(),
- RsaTestBase.echoTcpConsumer(),
+ echoTcpConsumer(),
configZKDiscovery()
};
}
diff --git a/itests/felix/src/test/resources/logback.xml b/itests/felix/src/test/resources/logback.xml
index f4696e4..3b7921f 100644
--- a/itests/felix/src/test/resources/logback.xml
+++ b/itests/felix/src/test/resources/logback.xml
@@ -20,10 +20,12 @@
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
- <pattern>%date %level %logger [%file : %line] %msg - %mdc %n</pattern>
+ <pattern>%date{HH:mm:ss} %level %logger{40}:%line - %msg%n</pattern>
</encoder>
</appender>
+ <logger name="org.apache.zookeeper" level="WARN" />
+
<root level="info">
<appender-ref ref="console"/>
</root>
diff --git a/parent/pom.xml b/parent/pom.xml
index ab071f9..f65aac2 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -58,15 +58,20 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.annotation</artifactId>
+ <version>7.0.0</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- <scope>test</scope>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.2.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
@@ -234,7 +239,7 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
- <version>3.5.1</version>
+ <version>4.2.0</version>
<extensions>true</extensions>
</plugin>
<plugin>
diff --git a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java b/spi/src/main/java/org/apache/aries/rsa/spi/EndpointDescriptionParser.java
similarity index 65%
copy from discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
copy to spi/src/main/java/org/apache/aries/rsa/spi/EndpointDescriptionParser.java
index 478e41b..caa5d01 100644
--- a/discovery/local/src/test/java/org/apache/aries/rsa/discovery/local/ActivatorTest.java
+++ b/spi/src/main/java/org/apache/aries/rsa/spi/EndpointDescriptionParser.java
@@ -16,12 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.aries.rsa.discovery.local;
+package org.apache.aries.rsa.spi;
-import junit.framework.TestCase;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
-public class ActivatorTest extends TestCase {
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
- public void testActivator() throws Exception {
- }
+public interface EndpointDescriptionParser {
+
+ List<EndpointDescription> readEndpoints(InputStream is);
+
+ EndpointDescription readEndpoint(InputStream is);
+
+ void writeEndpoint(EndpointDescription epd, OutputStream os);
}
diff --git a/spi/src/main/java/org/apache/aries/rsa/spi/packageinfo b/spi/src/main/java/org/apache/aries/rsa/spi/packageinfo
index c72722a..1da3be9 100644
--- a/spi/src/main/java/org/apache/aries/rsa/spi/packageinfo
+++ b/spi/src/main/java/org/apache/aries/rsa/spi/packageinfo
@@ -16,4 +16,4 @@
# specific language governing permissions and limitations
# under the License.
#
-version 1.0.0
+version 1.1.0