Refactor TopologyManagerImport
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
index 8071903..e6b47f8 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/MultiMap.java
@@ -18,11 +18,13 @@
  */
 package org.apache.aries.rsa.topologymanager.importer;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.stream.Collectors;
 
 /**
  * Minimal implementation of a thread-safe map where each key can have multiple values.
@@ -68,6 +70,10 @@
         return map.keySet();
     }
 
+    public Set<V> allValues() {
+        return map.values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+    }
+
     public void clear() {
         map.clear();
     }
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index 2dd995b..95d3aee 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -18,8 +18,6 @@
  */
 package org.apache.aries.rsa.topologymanager.importer;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
@@ -84,21 +82,14 @@
             LOG.info("Interrupted while waiting for {} to terminate", execService);
             Thread.currentThread().interrupt();
         }
-        closeAllImports();
-    }
-
-    private void closeAllImports() {
+        // close all imports
         importPossibilities.clear();
-        for (String filter : importedServices.keySet()) {
-            unImportForGoneEndpoints(filter);
-        }
+        importedServices.allValues().forEach(ir -> unimportService(ir.getImportReference()));
     }
 
     public void add(RemoteServiceAdmin rsa) {
         rsaSet.add(rsa);
-        for (String filter : importPossibilities.keySet()) {
-            triggerSynchronizeImports(filter);
-        }
+        importPossibilities.keySet().forEach(this::synchronizeImportsAsync);
     }
 
     public void remove(RemoteServiceAdmin rsa) {
@@ -108,124 +99,96 @@
     @Override
     public void remoteAdminEvent(RemoteServiceAdminEvent event) {
         if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
-            unImport(event.getImportReference());
+            unimportService(event.getImportReference());
         }
     }
 
-    private void triggerSynchronizeImports(final String filter) {
+    private void synchronizeImportsAsync(final String filter) {
         LOG.debug("Import of a service for filter {} was queued", filter);
         if (!rsaSet.isEmpty()) {
-            execService.execute(new Runnable() {
-                public void run() {
-                    synchronizeImports(filter);
-                }
-            });
+            execService.execute(() -> synchronizeImports(filter));
         }
     }
 
+    /**
+     * Synchronizes the actual imports with the possible imports for the given filter,
+     * i.e. unimports previously imported endpoints that are no longer possible,
+     * and imports new possible endpoints that are not already imported.
+     *
+     * @param filter the filter whose endpoints are synchronized
+     */
     private void synchronizeImports(final String filter) {
         try {
-            unImportForGoneEndpoints(filter);
-            importServices(filter);
+            // unimport endpoints that are no longer possible
+            unimportRemovedServices(filter);
+            // import new endpoints
+            importAddedServices(filter);
+            // TODO but optional: if the service is already imported and the endpoint is still
+            // in the list of possible imports check if a "better" endpoint is now in the list
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
         // Notify EndpointListeners? NO!
     }
 
-    private void importServices(String filter) {
-        Set<ImportRegistration> importRegistrations = importedServices.get(filter);
-        for (EndpointDescription endpoint : importPossibilities.get(filter)) {
-            // TODO but optional: if the service is already imported and the endpoint is still
-            // in the list of possible imports check if a "better" endpoint is now in the list
-            if (!alreadyImported(endpoint, importRegistrations)) {
-                ImportRegistration ir = importService(endpoint);
-                if (ir != null) {
-                    // import was successful
-                    importedServices.put(filter, ir);
-                }
-            }
-        }
+    /**
+     * Checks if an endpoint is included in a set of import registrations.
+     *
+     * @param imported the import registrations that may include the endpoint
+     * @param endpoint the endpoint to check
+     * @return whether the given endpoint is imported
+     */
+    private boolean isImported(Set<ImportRegistration> imported, EndpointDescription endpoint) {
+        return imported.stream()
+            .map(ImportRegistration::getImportReference)
+            .anyMatch(ir -> ir != null && endpoint.equals(ir.getImportedEndpoint()));
     }
 
-    private boolean alreadyImported(EndpointDescription endpoint, Set<ImportRegistration> importRegistrations) {
-        for (ImportRegistration ir : importRegistrations) {
-            final ImportReference importReference = ir.getImportReference();
-            if (importReference == null) {
-                LOG.debug("ImportRegistration {} already closed", ir);
-                continue;
-            }
-            if (endpoint.equals(importReference.getImportedEndpoint())) {
-                return true;
-            }
-        }
-        return false;
+    private void importAddedServices(String filter) {
+        Set<EndpointDescription> possible = importPossibilities.get(filter);
+        Set<ImportRegistration> imported = importedServices.get(filter);
+        possible.stream()
+            .filter(endpoint -> !isImported(imported, endpoint)) // filter out already imported
+            .forEach(endpoint -> importService(filter, endpoint)); // import the new endpoints
+    }
+
+    private void unimportRemovedServices(String filter) {
+        Set<EndpointDescription> possible = importPossibilities.get(filter);
+        Set<ImportRegistration> imported = importedServices.get(filter);
+        imported.stream()
+            .map(ImportRegistration::getImportReference)
+            .filter(ir -> ir != null && !possible.contains(ir.getImportedEndpoint())) // filter out possibles
+            .forEach(this::unimportService); // unimport the non-possibles
     }
 
     /**
-     * Tries to import the service with each rsa until one import is successful
+     * Tries to import the service with each rsa until one import is successful.
      *
+     * @param filter the filter that matched the endpoint
      * @param endpoint endpoint to import
-     * @return import registration of the first successful import
      */
-    private ImportRegistration importService(EndpointDescription endpoint) {
+    private void importService(String filter, EndpointDescription endpoint) {
         for (RemoteServiceAdmin rsa : rsaSet) {
             ImportRegistration ir = rsa.importService(endpoint);
             if (ir != null) {
                 if (ir.getException() == null) {
                     LOG.debug("Service import was successful {}", ir);
-                    return ir;
+                    importedServices.put(filter, ir);
+                    return;
                 } else {
                     LOG.info("Error importing service " + endpoint, ir.getException());
                 }
             }
         }
-        return null;
     }
 
-    private void unImportForGoneEndpoints(String filter) {
-        Set<ImportRegistration> importRegistrations = importedServices.get(filter);
-        Set<EndpointDescription> endpoints = importPossibilities.get(filter);
-        for (ImportRegistration ir : importRegistrations) {
-            final ImportReference importReference = ir.getImportReference();
-            if (importReference == null) {
-                LOG.debug("Unable to get ImportReference for ImportRegistration {}: already closed", ir);
-                continue;
-            }
-            EndpointDescription endpoint = importReference.getImportedEndpoint();
-            if (endpoint == null) {
-                LOG.debug("Unable to get EndpointDescription of ImportReference for ImportRegistration {}: already closed", ir);
-                continue;
-            }
-            if (!endpoints.contains(endpoint)) {
-                unImport(importReference);
-            }
-        }
-    }
-
-    private void unImport(ImportReference ref) {
-        List<ImportRegistration> removed = new ArrayList<>();
-        Set<String> imported = importedServices.keySet();
-        for (String key : imported) {
-            for (ImportRegistration ir : importedServices.get(key)) {
-                final ImportReference importReference = ir.getImportReference();
-                if (importReference == null) {
-                    LOG.debug("Unable to get ImportReference for ImportRegistration {}: already closed", ir);
-                    continue;
-                }
-                if (importReference.equals(ref)) {
-                    removed.add(ir);
-                }
-            }
-        }
-        closeAll(removed);
-    }
-
-    private void closeAll(List<ImportRegistration> removed) {
-        for (ImportRegistration ir : removed) {
-            importedServices.remove(ir);
-            ir.close();
-        }
+    private void unimportService(ImportReference ref) {
+        importedServices.allValues().stream()
+            .filter(ir -> ref != null && ref.equals(ir.getImportReference()))
+            .forEach(ir -> {
+                importedServices.remove(ir);
+                ir.close();
+            });
     }
 
     @Override
@@ -240,17 +203,15 @@
                 importPossibilities.put(filter, endpoint);
                 break;
             case EndpointEvent.REMOVED:
+            case EndpointEvent.MODIFIED_ENDMATCH:
                 importPossibilities.remove(filter, endpoint);
                 break;
             case EndpointEvent.MODIFIED:
                 importPossibilities.remove(filter, endpoint);
                 importPossibilities.put(filter, endpoint);
                 break;
-            case EndpointEvent.MODIFIED_ENDMATCH:
-                importPossibilities.remove(filter, endpoint);
-                break;
         }
-        triggerSynchronizeImports(filter);
+        synchronizeImportsAsync(filter);
     }
 
 }