blob: ff3a13e7d459cc7993c710db0a6d37e628f32b26 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.typedevent.remote.spi;
import static java.lang.Boolean.TRUE;
import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_FILTER;
import static org.osgi.service.typedevent.TypedEventConstants.TYPED_EVENT_TOPICS;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.typedevent.UntypedEventHandler;
/**
* A simple helper class used to manage the registrations of {@link UntypedEventHandler}
* services in the local service registry, used to feed events into the remote events
* implementation.
*
* Implementations should extend this type and override the {@link #notifyUntyped(String, Map)} method
* to receive events. The set of events received can be altered by calling {@link #updateTargets(Map)}.
*/
public abstract class LocalEventConsumerManager implements UntypedEventHandler {
/**
* A service property indicating that the event handler is a proxy created for a remote node and so
* should not be considered as a local interest.
*/
public static final String ARIES_LOCAL_EVENT_PROXY = "org.apache.aries.typedevent.remote.spi.local.proxy";
/**
* A filter to exclude local proxy interests from remote nodes
*/
public static final String ARIES_LOCAL_EVENT_PROXY_EXCLUSION_FILTER = "(!(" + ARIES_LOCAL_EVENT_PROXY + "=true))";
private final Object lock = new Object();
private final Map<String, ServiceRegistration<UntypedEventHandler>> listenerRegistrations = new HashMap<>();
private final Map<String, String> topicsToFilters = new HashMap<>();
private BundleContext ctx;
/**
* Starts this manager, registering any necessary whiteboard services with the
* appropriate topic and filters;
* @param ctx
*/
public final void start(BundleContext ctx) {
synchronized (lock) {
this.ctx = ctx;
}
updateServiceRegistrations();
}
/**
* Stops this manager, unregistering any whiteboard services
*/
public final void stop() {
synchronized (lock) {
this.ctx = null;
}
Map<String, ServiceRegistration<UntypedEventHandler>> toUnregister;
synchronized (lock) {
toUnregister = new HashMap<>(listenerRegistrations);
listenerRegistrations.clear();
}
toUnregister.values().stream().forEach(this::safeUnregister);
}
private void updateServiceRegistrations() {
Map<String, String> possibleUpdates = new HashMap<String, String>();
Map<String, ServiceRegistration<UntypedEventHandler>> toUnregister;
synchronized (lock) {
possibleUpdates = new HashMap<>(topicsToFilters);
toUnregister = listenerRegistrations.entrySet().stream()
.filter(e -> !topicsToFilters.containsKey(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
listenerRegistrations.keySet().removeAll(toUnregister.keySet());
}
toUnregister.values().stream().forEach(this::safeUnregister);
for (Entry<String, String> entry : possibleUpdates.entrySet()) {
String topic = entry.getKey();
String filter = entry.getValue();
ServiceRegistration<UntypedEventHandler> reg;
BundleContext ctx;
synchronized (lock) {
reg = listenerRegistrations.get(topic);
ctx = this.ctx;
}
if(reg == null) {
if(ctx != null) {
Dictionary<String, Object> props = new Hashtable<>();
props.put(TYPED_EVENT_TOPICS, topic);
props.put(ARIES_LOCAL_EVENT_PROXY, TRUE);
if(filter != null && !filter.contentEquals("")) {
props.put(TYPED_EVENT_FILTER, filter);
}
reg = ctx.registerService(UntypedEventHandler.class, this, props);
synchronized (lock) {
ServiceRegistration<UntypedEventHandler> oldReg = listenerRegistrations.putIfAbsent(topic, reg);
if(oldReg == null) {
reg = null;
}
}
if(reg != null) {
reg.unregister();
}
}
} else if(ctx != null) {
Dictionary<String, Object> props = new Hashtable<>();
props.put(TYPED_EVENT_TOPICS, topic);
props.put(ARIES_LOCAL_EVENT_PROXY, TRUE);
if(filter != null && !filter.contentEquals("")) {
if(filter.equals(reg.getReference().getProperty(TYPED_EVENT_FILTER))) {
// Filter unchanged - no need to update
continue;
}
props.put(TYPED_EVENT_FILTER, filter);
} else if (reg.getReference().getProperty(TYPED_EVENT_FILTER) == null) {
// Filter unchanged - no need to update
continue;
}
reg.setProperties(props);
}
}
boolean changed;
synchronized (lock) {
changed = !possibleUpdates.equals(topicsToFilters);
}
if(changed) {
updateServiceRegistrations();
}
}
private void safeUnregister(ServiceRegistration<?> reg) {
try {
reg.unregister();
} catch (IllegalStateException ise) {
// Just ignore it
}
}
/**
* Set the topic and filter targets for which whiteboard listeners
* should be registered
* @param updated - A Map of topic names (or globs) to filters
*/
protected final void updateTargets(Map<String, String> updated) {
synchronized (lock) {
topicsToFilters.clear();
topicsToFilters.putAll(updated);
}
updateServiceRegistrations();
}
}