blob: 240e5eadd986f0067cd2c0665252a085e12819a3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cxf.dosgi.discovery.zookeeper.subscribe;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.cxf.dosgi.discovery.zookeeper.ZooKeeperDiscovery;
import org.apache.cxf.dosgi.discovery.zookeeper.util.Utils;
import org.apache.zookeeper.ZooKeeper;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.service.remoteserviceadmin.EndpointDescription;
import org.osgi.service.remoteserviceadmin.EndpointListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.cxf.dosgi.discovery.local.util.Utils.matchFilter;
/**
* Manages the EndpointListeners and the scopes they are interested in.
* For each scope with interested EndpointListeners an InterfaceMonitor is created.
* The InterfaceMonitor calls back when it detects added or removed external Endpoints.
* These events are then forwarded to all interested EndpointListeners.
*/
public class InterfaceMonitorManager {
private static final Logger LOG = LoggerFactory.getLogger(InterfaceMonitorManager.class);
private final BundleContext bctx;
private final ZooKeeper zk;
// map of EndpointListeners and the scopes they are interested in
private final Map<ServiceReference<EndpointListener>, List<String>> endpointListenerScopes =
new HashMap<ServiceReference<EndpointListener>, List<String>>();
// map of scopes and their interest data
private final Map<String, Interest> interests = new HashMap<String, Interest>();
protected static class Interest {
List<ServiceReference<EndpointListener>> endpointListeners =
new CopyOnWriteArrayList<ServiceReference<EndpointListener>>();
InterfaceMonitor monitor;
}
public InterfaceMonitorManager(BundleContext bctx, ZooKeeper zk) {
this.bctx = bctx;
this.zk = zk;
}
public void addInterest(ServiceReference<EndpointListener> endpointListener) {
if (isOurOwnEndpointListener(endpointListener)) {
LOG.debug("Skipping our own EndpointListener");
return;
}
LOG.info("updating EndpointListener interests: {}", endpointListener);
if (LOG.isDebugEnabled()) {
LOG.debug("updated EndpointListener properties: {}", Utils.getProperties(endpointListener));
}
for (String scope : Utils.getScopes(endpointListener)) {
String objClass = Utils.getObjectClass(scope);
LOG.debug("Adding interest in scope {}, objectClass {}", scope, objClass);
addInterest(endpointListener, scope, objClass);
}
}
private static boolean isOurOwnEndpointListener(ServiceReference<EndpointListener> endpointListener) {
return Boolean.parseBoolean(String.valueOf(
endpointListener.getProperty(ZooKeeperDiscovery.DISCOVERY_ZOOKEEPER_ID)));
}
@SuppressWarnings("unchecked")
public synchronized void addInterest(ServiceReference<EndpointListener> endpointListener,
String scope, String objClass) {
// get or create interest for given scope and add listener to it
Interest interest = interests.get(scope);
if (interest == null) {
// create interest, add listener and start monitor
interest = new Interest();
interests.put(scope, interest);
interest.endpointListeners.add(endpointListener); // add it before monitor starts so we don't miss events
interest.monitor = createInterfaceMonitor(scope, objClass, interest);
interest.monitor.start();
} else {
// interest already exists, so just add listener to it
if (!interest.endpointListeners.contains(endpointListener)) {
interest.endpointListeners.add(endpointListener);
}
// notify listener of all known endpoints for given scope
// (as EndpointListener contract requires of all added/modified listeners)
for (EndpointDescription endpoint : interest.monitor.getEndpoints()) {
notifyListeners(endpoint, scope, true, Arrays.asList(endpointListener));
}
}
// add scope to listener's scopes list
List<String> scopes = endpointListenerScopes.get(endpointListener);
if (scopes == null) {
scopes = new ArrayList<String>(1);
endpointListenerScopes.put(endpointListener, scopes);
}
if (!scopes.contains(scope)) {
scopes.add(scope);
}
}
public synchronized void removeInterest(ServiceReference<EndpointListener> endpointListener) {
LOG.info("removing EndpointListener interests: {}", endpointListener);
List<String> scopes = endpointListenerScopes.get(endpointListener);
if (scopes == null) {
return;
}
for (String scope : scopes) {
Interest interest = interests.get(scope);
if (interest != null) {
interest.endpointListeners.remove(endpointListener);
if (interest.endpointListeners.isEmpty()) {
interest.monitor.close();
interests.remove(scope);
}
}
}
endpointListenerScopes.remove(endpointListener);
}
protected InterfaceMonitor createInterfaceMonitor(final String scope, String objClass, final Interest interest) {
// holding this object's lock in the callbacks can lead to a deadlock with InterfaceMonitor
EndpointListener endpointListener = new EndpointListener() {
public void endpointRemoved(EndpointDescription endpoint, String matchedFilter) {
notifyListeners(endpoint, scope, false, interest.endpointListeners);
}
public void endpointAdded(EndpointDescription endpoint, String matchedFilter) {
notifyListeners(endpoint, scope, true, interest.endpointListeners);
}
};
return new InterfaceMonitor(zk, objClass, endpointListener, scope);
}
private void notifyListeners(EndpointDescription endpoint, String currentScope, boolean isAdded,
List<ServiceReference<EndpointListener>> endpointListeners) {
for (ServiceReference<EndpointListener> endpointListenerRef : endpointListeners) {
EndpointListener service = bctx.getService(endpointListenerRef);
try {
EndpointListener endpointListener = (EndpointListener)service;
LOG.trace("matching {} against {}", endpoint, currentScope);
if (matchFilter(bctx, currentScope, endpoint)) {
LOG.debug("Matched {} against {}", endpoint, currentScope);
notifyListener(endpoint, currentScope, isAdded, endpointListenerRef.getBundle(),
endpointListener);
}
} finally {
if (service != null) {
bctx.ungetService(endpointListenerRef);
}
}
}
}
private void notifyListener(EndpointDescription endpoint, String currentScope, boolean isAdded,
Bundle endpointListenerBundle, EndpointListener endpointListener) {
if (endpointListenerBundle == null) {
LOG.info("listening service was unregistered, ignoring");
} else if (isAdded) {
LOG.info("calling EndpointListener.endpointAdded: " + endpointListener + " from bundle "
+ endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
endpointListener.endpointAdded(endpoint, currentScope);
} else {
LOG.info("calling EndpointListener.endpointRemoved: " + endpointListener + " from bundle "
+ endpointListenerBundle.getSymbolicName() + " for endpoint: " + endpoint);
endpointListener.endpointRemoved(endpoint, currentScope);
}
}
public synchronized void close() {
for (Interest interest : interests.values()) {
interest.monitor.close();
}
interests.clear();
endpointListenerScopes.clear();
}
/**
* Only for test case!
*/
protected synchronized Map<String, Interest> getInterests() {
return interests;
}
/**
* Only for test case!
*/
protected synchronized Map<ServiceReference<EndpointListener>, List<String>> getEndpointListenerScopes() {
return endpointListenerScopes;
}
}