Factor out diffing of imported and possible (#39)

* Factor out diffing of imported and possible

* Small fix
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
new file mode 100644
index 0000000..4f7dc72
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.ImportReference;
+import org.osgi.service.remoteserviceadmin.ImportRegistration;
+
+public class ImportDiff {
+    private Set<EndpointDescription> possible;
+    private Set<ImportRegistration> imported;
+
+    public ImportDiff(Set<EndpointDescription> possible, Set<ImportRegistration> imported) {
+        this.possible = possible;
+        this.imported = imported;
+    }
+
+    public Stream<ImportReference> getRemoved() {
+        return imported.stream()
+                .map(ImportRegistration::getImportReference)
+                .filter(Objects::nonNull)
+                .filter(ir -> !possible.contains(ir.getImportedEndpoint()));
+    }
+    
+    public Stream<EndpointDescription> getAdded() {
+        Set<EndpointDescription> importedEndpoints = importedEndpoints();
+        return possible.stream()
+                .filter(not(importedEndpoints::contains));
+    }
+    
+    private Set<EndpointDescription> importedEndpoints() {
+        return imported.stream()
+            .map(ImportRegistration::getImportReference).filter(Objects::nonNull)
+            .map(ImportReference::getImportedEndpoint).filter(Objects::nonNull)
+            .collect(Collectors.toSet());
+    }
+
+    private static <T> Predicate<T> not(Predicate<T> t) {
+        return t.negate();
+    }
+}
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index 95d3aee..2543da9 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import org.apache.aries.rsa.topologymanager.NamedThreadFactory;
 import org.osgi.framework.BundleContext;
@@ -114,17 +115,20 @@
      * Synchronizes the actual imports with the possible imports for the given filter,
      * i.e. unimports previously imported endpoints that are no longer possible,
      * and imports new possible endpoints that are not already imported.
+     * 
+     * TODO but optional: if the service is already imported and the endpoint is still
+     * in the list of possible imports check if a "better" endpoint is now in the list
      *
      * @param filter the filter whose endpoints are synchronized
      */
     private void synchronizeImports(final String filter) {
         try {
-            // unimport endpoints that are no longer possible
-            unimportRemovedServices(filter);
-            // import new endpoints
-            importAddedServices(filter);
-            // TODO but optional: if the service is already imported and the endpoint is still
-            // in the list of possible imports check if a "better" endpoint is now in the list
+            ImportDiff diff = new ImportDiff(importPossibilities.get(filter), importedServices.get(filter));
+            diff.getRemoved()
+                .forEach(this::unimportService);
+            diff.getAdded()
+                .flatMap(this::importService)
+                .forEach(ir -> importedServices.put(filter, ir));
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
@@ -132,54 +136,25 @@
     }
 
     /**
-     * Checks if an endpoint is included in a set of import registrations.
-     *
-     * @param imported the import registrations that may include the endpoint
-     * @param endpoint the endpoint to check
-     * @return whether the given endpoint is imported
-     */
-    private boolean isImported(Set<ImportRegistration> imported, EndpointDescription endpoint) {
-        return imported.stream()
-            .map(ImportRegistration::getImportReference)
-            .anyMatch(ir -> ir != null && endpoint.equals(ir.getImportedEndpoint()));
-    }
-
-    private void importAddedServices(String filter) {
-        Set<EndpointDescription> possible = importPossibilities.get(filter);
-        Set<ImportRegistration> imported = importedServices.get(filter);
-        possible.stream()
-            .filter(endpoint -> !isImported(imported, endpoint)) // filter out already imported
-            .forEach(endpoint -> importService(filter, endpoint)); // import the new endpoints
-    }
-
-    private void unimportRemovedServices(String filter) {
-        Set<EndpointDescription> possible = importPossibilities.get(filter);
-        Set<ImportRegistration> imported = importedServices.get(filter);
-        imported.stream()
-            .map(ImportRegistration::getImportReference)
-            .filter(ir -> ir != null && !possible.contains(ir.getImportedEndpoint())) // filter out possibles
-            .forEach(this::unimportService); // unimport the non-possibles
-    }
-
-    /**
      * Tries to import the service with each rsa until one import is successful.
      *
      * @param filter the filter that matched the endpoint
      * @param endpoint endpoint to import
+     * @return 
      */
-    private void importService(String filter, EndpointDescription endpoint) {
+    private Stream<ImportRegistration> importService(EndpointDescription endpoint) {
         for (RemoteServiceAdmin rsa : rsaSet) {
             ImportRegistration ir = rsa.importService(endpoint);
             if (ir != null) {
                 if (ir.getException() == null) {
                     LOG.debug("Service import was successful {}", ir);
-                    importedServices.put(filter, ir);
-                    return;
+                    return Stream.of(ir);
                 } else {
                     LOG.info("Error importing service " + endpoint, ir.getException());
                 }
             }
         }
+        return Stream.empty();
     }
 
     private void unimportService(ImportReference ref) {