blob: 67f63e423508c30f130bef0281a1140fdacdeac3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.typedevent.remote.remoteservices.impl;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static org.osgi.namespace.service.ServiceNamespace.SERVICE_NAMESPACE;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.aries.typedevent.remote.api.RemoteEventConstants;
import org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus;
import org.osgi.annotation.bundle.Capability;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
import org.osgi.framework.Filter;
import org.osgi.framework.FrameworkUtil;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.typedevent.TypedEventBus;
import org.osgi.service.typedevent.annotations.RequireTypedEvents;
/**
* This class implements {@link RemoteEventBus} and is responsible for receiving
* events from remote frameworks and publishing them in the local framework
*/
@Capability(namespace=SERVICE_NAMESPACE, attribute="objectClass:List<String>=org.apache.aries.typedevent.remote.remoteservices.spi.RemoteEventBus", uses=RemoteEventBus.class)
@RequireTypedEvents
public class RemoteEventBusImpl implements RemoteEventBus {
private final TypedEventBus eventBus;
private ServiceRegistration<RemoteEventBus> reg;
private Map<String, Filter> topicsToFilters = new HashMap<>();
private final Map<Long, Map<String, Filter>> servicesToInterests = new HashMap<>();
private final Object lock = new Object();
public RemoteEventBusImpl(TypedEventBus eventBus) {
this.eventBus = eventBus;
}
public void init(BundleContext ctx) {
ServiceRegistration<RemoteEventBus> reg = ctx.registerService(RemoteEventBus.class, this, null);
Map<String, Filter> filters;
synchronized(lock) {
this.reg = reg;
filters = topicsToFilters;
}
updateReg(filters);
}
public void destroy() {
try {
ServiceRegistration<?> reg;
synchronized (lock) {
reg = this.reg;
this.reg = null;
}
if(reg != null) {
reg.unregister();
}
} catch (IllegalStateException ise) {
// TODO log
}
}
@Override
public void notify(String topic, Map<String, Object> properties) {
boolean hasTopicInterest;
Filter filter;
synchronized (lock) {
hasTopicInterest = topicsToFilters.containsKey(topic);
filter = topicsToFilters.get(topic);
}
if(hasTopicInterest) {
if(filter == null || filter.matches(properties)) {
properties.put(RemoteEventConstants.REMOTE_EVENT_MARKER, Boolean.TRUE);
eventBus.deliverUntyped(topic, properties);
} else {
//TODO log filter mismatch
}
} else {
// TODO log topic mismatch
}
}
/**
* Update the data structures and registration to reflect the topic interests
* of the local framework
*
* @param id
* @param topics
* @param filter
*/
void updateLocalInterest(Long id, List<String> topics, Filter filter) {
boolean doUpdate = false;
Map<String, Filter> newData = topics.stream()
.collect(toMap(identity(), x -> filter, (a,b) -> a));
Map<String, Filter> updatedFilters;
synchronized(lock) {
doUpdate = true;
servicesToInterests.put(id, newData);
topicsToFilters = getUpdatedFilters();
updatedFilters = topicsToFilters;
}
if(doUpdate) {
updateReg(updatedFilters);
}
}
private Map<String, Filter> getUpdatedFilters() {
synchronized (lock) {
return servicesToInterests.values().stream()
.flatMap(m -> m.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue,
this::combineFilters));
}
}
private Filter combineFilters(Filter a, Filter b) {
if(a == null) {
return b;
} else if (b == null) {
return a;
} else {
try {
return FrameworkUtil.createFilter("(|" + a.toString() + b.toString() + ")");
} catch (InvalidSyntaxException e) {
// TODO Auto-generated catch block
throw new RuntimeException(e);
}
}
}
private void updateReg(Map<String, Filter> filters) {
Hashtable<String, Object> props = new Hashtable<>();
props.put(Constants.SERVICE_EXPORTED_INTERFACES, RemoteEventBus.class.getName());
props.put(Constants.SERVICE_EXPORTED_INTENTS, "osgi.basic");
List<String> remoteFilters = filters.entrySet().stream()
.map(e -> e.getKey() + "=" + (e.getValue() == null ? "" : e.getValue().toString()))
.collect(toList());
props.put(REMOTE_EVENT_FILTERS, remoteFilters);
ServiceRegistration<?> reg;
synchronized (lock) {
reg = this.reg;
}
if(reg != null) {
// Only update if there is a change
Object existingFilters = reg.getReference().getProperty(REMOTE_EVENT_FILTERS);
if(!remoteFilters.equals(existingFilters)) {
reg.setProperties(props);
}
// Deal with a race condition if
Map<String, Filter> updatedFilters;
synchronized (lock) {
updatedFilters = topicsToFilters;
}
if(!updatedFilters.equals(filters)) {
updateReg(updatedFilters);
}
}
}
void removeLocalInterest(Long id) {
Map<String, Filter> updatedFilters;
synchronized(lock) {
if(servicesToInterests.remove(id) == null) {
return;
}
topicsToFilters = getUpdatedFilters();
updatedFilters = topicsToFilters;
}
updateReg(updatedFilters);
}
}