Refactor TopologyManagerImport
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
index 8071903..e6b47f8 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
@@ -18,11 +18,13 @@
*/
package org.apache.aries.rsa.topologymanager.importer;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.stream.Collectors;
/**
* Minimal implementation of a thread-safe map where each key can have multiple values.
@@ -68,6 +70,10 @@
return map.keySet();
}
+ public Set<V> allValues() {
+ return map.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+ }
+
public void clear() {
map.clear();
}
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index 2dd995b..95d3aee 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -18,8 +18,6 @@
*/
package org.apache.aries.rsa.topologymanager.importer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
@@ -84,21 +82,14 @@
LOG.info("Interrupted while waiting for {} to terminate", execService);
Thread.currentThread().interrupt();
}
- closeAllImports();
- }
-
- private void closeAllImports() {
+ // close all imports
importPossibilities.clear();
- for (String filter : importedServices.keySet()) {
- unImportForGoneEndpoints(filter);
- }
+ importedServices.allValues().forEach(ir -> unimportService(ir.getImportReference()));
}
public void add(RemoteServiceAdmin rsa) {
rsaSet.add(rsa);
- for (String filter : importPossibilities.keySet()) {
- triggerSynchronizeImports(filter);
- }
+ importPossibilities.keySet().forEach(this::synchronizeImportsAsync);
}
public void remove(RemoteServiceAdmin rsa) {
@@ -108,124 +99,96 @@
@Override
public void remoteAdminEvent(RemoteServiceAdminEvent event) {
if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
- unImport(event.getImportReference());
+ unimportService(event.getImportReference());
}
}
- private void triggerSynchronizeImports(final String filter) {
+ private void synchronizeImportsAsync(final String filter) {
LOG.debug("Import of a service for filter {} was queued", filter);
if (!rsaSet.isEmpty()) {
- execService.execute(new Runnable() {
- public void run() {
- synchronizeImports(filter);
- }
- });
+ execService.execute(() -> synchronizeImports(filter));
}
}
+ /**
+ * Synchronizes the actual imports with the possible imports for the given filter,
+ * i.e. unimports previously imported endpoints that are no longer possible,
+ * and imports new possible endpoints that are not already imported.
+ *
+ * @param filter the filter whose endpoints are synchronized
+ */
private void synchronizeImports(final String filter) {
try {
- unImportForGoneEndpoints(filter);
- importServices(filter);
+ // unimport endpoints that are no longer possible
+ unimportRemovedServices(filter);
+ // import new endpoints
+ importAddedServices(filter);
+ // TODO but optional: if the service is already imported and the endpoint is still
+ // in the list of possible imports check if a "better" endpoint is now in the list
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
// Notify EndpointListeners? NO!
}
- private void importServices(String filter) {
- Set<ImportRegistration> importRegistrations = importedServices.get(filter);
- for (EndpointDescription endpoint : importPossibilities.get(filter)) {
- // TODO but optional: if the service is already imported and the endpoint is still
- // in the list of possible imports check if a "better" endpoint is now in the list
- if (!alreadyImported(endpoint, importRegistrations)) {
- ImportRegistration ir = importService(endpoint);
- if (ir != null) {
- // import was successful
- importedServices.put(filter, ir);
- }
- }
- }
+ /**
+ * Checks if an endpoint is included in a set of import registrations.
+ *
+ * @param imported the import registrations that may include the endpoint
+ * @param endpoint the endpoint to check
+ * @return whether the given endpoint is imported
+ */
+ private boolean isImported(Set<ImportRegistration> imported, EndpointDescription endpoint) {
+ return imported.stream()
+ .map(ImportRegistration::getImportReference)
+ .anyMatch(ir -> ir != null && endpoint.equals(ir.getImportedEndpoint()));
}
- private boolean alreadyImported(EndpointDescription endpoint, Set<ImportRegistration> importRegistrations) {
- for (ImportRegistration ir : importRegistrations) {
- final ImportReference importReference = ir.getImportReference();
- if (importReference == null) {
- LOG.debug("ImportRegistration {} already closed", ir);
- continue;
- }
- if (endpoint.equals(importReference.getImportedEndpoint())) {
- return true;
- }
- }
- return false;
+ private void importAddedServices(String filter) {
+ Set<EndpointDescription> possible = importPossibilities.get(filter);
+ Set<ImportRegistration> imported = importedServices.get(filter);
+ possible.stream()
+ .filter(endpoint -> !isImported(imported, endpoint)) // filter out already imported
+ .forEach(endpoint -> importService(filter, endpoint)); // import the new endpoints
+ }
+
+ private void unimportRemovedServices(String filter) {
+ Set<EndpointDescription> possible = importPossibilities.get(filter);
+ Set<ImportRegistration> imported = importedServices.get(filter);
+ imported.stream()
+ .map(ImportRegistration::getImportReference)
+ .filter(ir -> ir != null && !possible.contains(ir.getImportedEndpoint())) // filter out possibles
+ .forEach(this::unimportService); // unimport the non-possibles
}
/**
- * Tries to import the service with each rsa until one import is successful
+ * Tries to import the service with each rsa until one import is successful.
*
+ * @param filter the filter that matched the endpoint
* @param endpoint endpoint to import
- * @return import registration of the first successful import
*/
- private ImportRegistration importService(EndpointDescription endpoint) {
+ private void importService(String filter, EndpointDescription endpoint) {
for (RemoteServiceAdmin rsa : rsaSet) {
ImportRegistration ir = rsa.importService(endpoint);
if (ir != null) {
if (ir.getException() == null) {
LOG.debug("Service import was successful {}", ir);
- return ir;
+ importedServices.put(filter, ir);
+ return;
} else {
LOG.info("Error importing service " + endpoint, ir.getException());
}
}
}
- return null;
}
- private void unImportForGoneEndpoints(String filter) {
- Set<ImportRegistration> importRegistrations = importedServices.get(filter);
- Set<EndpointDescription> endpoints = importPossibilities.get(filter);
- for (ImportRegistration ir : importRegistrations) {
- final ImportReference importReference = ir.getImportReference();
- if (importReference == null) {
- LOG.debug("Unable to get ImportReference for ImportRegistration {}: already closed", ir);
- continue;
- }
- EndpointDescription endpoint = importReference.getImportedEndpoint();
- if (endpoint == null) {
- LOG.debug("Unable to get EndpointDescription of ImportReference for ImportRegistration {}: already closed", ir);
- continue;
- }
- if (!endpoints.contains(endpoint)) {
- unImport(importReference);
- }
- }
- }
-
- private void unImport(ImportReference ref) {
- List<ImportRegistration> removed = new ArrayList<>();
- Set<String> imported = importedServices.keySet();
- for (String key : imported) {
- for (ImportRegistration ir : importedServices.get(key)) {
- final ImportReference importReference = ir.getImportReference();
- if (importReference == null) {
- LOG.debug("Unable to get ImportReference for ImportRegistration {}: already closed", ir);
- continue;
- }
- if (importReference.equals(ref)) {
- removed.add(ir);
- }
- }
- }
- closeAll(removed);
- }
-
- private void closeAll(List<ImportRegistration> removed) {
- for (ImportRegistration ir : removed) {
- importedServices.remove(ir);
- ir.close();
- }
+ private void unimportService(ImportReference ref) {
+ importedServices.allValues().stream()
+ .filter(ir -> ref != null && ref.equals(ir.getImportReference()))
+ .forEach(ir -> {
+ importedServices.remove(ir);
+ ir.close();
+ });
}
@Override
@@ -240,17 +203,15 @@
importPossibilities.put(filter, endpoint);
break;
case EndpointEvent.REMOVED:
+ case EndpointEvent.MODIFIED_ENDMATCH:
importPossibilities.remove(filter, endpoint);
break;
case EndpointEvent.MODIFIED:
importPossibilities.remove(filter, endpoint);
importPossibilities.put(filter, endpoint);
break;
- case EndpointEvent.MODIFIED_ENDMATCH:
- importPossibilities.remove(filter, endpoint);
- break;
}
- triggerSynchronizeImports(filter);
+ synchronizeImportsAsync(filter);
}
}