Factor out diffing of imported and possible (#39)
* Factor out diffing of imported and possible
* Small fix
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
new file mode 100644
index 0000000..4f7dc72
--- /dev/null
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.aries.rsa.topologymanager.importer;
+
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.osgi.service.remoteserviceadmin.EndpointDescription;
+import org.osgi.service.remoteserviceadmin.ImportReference;
+import org.osgi.service.remoteserviceadmin.ImportRegistration;
+
+public class ImportDiff {
+ private Set<EndpointDescription> possible;
+ private Set<ImportRegistration> imported;
+
+ public ImportDiff(Set<EndpointDescription> possible, Set<ImportRegistration> imported) {
+ this.possible = possible;
+ this.imported = imported;
+ }
+
+ public Stream<ImportReference> getRemoved() {
+ return imported.stream()
+ .map(ImportRegistration::getImportReference)
+ .filter(Objects::nonNull)
+ .filter(ir -> !possible.contains(ir.getImportedEndpoint()));
+ }
+
+ public Stream<EndpointDescription> getAdded() {
+ Set<EndpointDescription> importedEndpoints = importedEndpoints();
+ return possible.stream()
+ .filter(not(importedEndpoints::contains));
+ }
+
+ private Set<EndpointDescription> importedEndpoints() {
+ return imported.stream()
+ .map(ImportRegistration::getImportReference).filter(Objects::nonNull)
+ .map(ImportReference::getImportedEndpoint).filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+
+ private static <T> Predicate<T> not(Predicate<T> t) {
+ return t.negate();
+ }
+}
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index 95d3aee..2543da9 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -24,6 +24,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import org.apache.aries.rsa.topologymanager.NamedThreadFactory;
import org.osgi.framework.BundleContext;
@@ -114,17 +115,20 @@
* Synchronizes the actual imports with the possible imports for the given filter,
* i.e. unimports previously imported endpoints that are no longer possible,
* and imports new possible endpoints that are not already imported.
+ *
+ * TODO but optional: if the service is already imported and the endpoint is still
+ * in the list of possible imports check if a "better" endpoint is now in the list
*
* @param filter the filter whose endpoints are synchronized
*/
private void synchronizeImports(final String filter) {
try {
- // unimport endpoints that are no longer possible
- unimportRemovedServices(filter);
- // import new endpoints
- importAddedServices(filter);
- // TODO but optional: if the service is already imported and the endpoint is still
- // in the list of possible imports check if a "better" endpoint is now in the list
+ ImportDiff diff = new ImportDiff(importPossibilities.get(filter), importedServices.get(filter));
+ diff.getRemoved()
+ .forEach(this::unimportService);
+ diff.getAdded()
+ .flatMap(this::importService)
+ .forEach(ir -> importedServices.put(filter, ir));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
@@ -132,54 +136,25 @@
}
/**
- * Checks if an endpoint is included in a set of import registrations.
- *
- * @param imported the import registrations that may include the endpoint
- * @param endpoint the endpoint to check
- * @return whether the given endpoint is imported
- */
- private boolean isImported(Set<ImportRegistration> imported, EndpointDescription endpoint) {
- return imported.stream()
- .map(ImportRegistration::getImportReference)
- .anyMatch(ir -> ir != null && endpoint.equals(ir.getImportedEndpoint()));
- }
-
- private void importAddedServices(String filter) {
- Set<EndpointDescription> possible = importPossibilities.get(filter);
- Set<ImportRegistration> imported = importedServices.get(filter);
- possible.stream()
- .filter(endpoint -> !isImported(imported, endpoint)) // filter out already imported
- .forEach(endpoint -> importService(filter, endpoint)); // import the new endpoints
- }
-
- private void unimportRemovedServices(String filter) {
- Set<EndpointDescription> possible = importPossibilities.get(filter);
- Set<ImportRegistration> imported = importedServices.get(filter);
- imported.stream()
- .map(ImportRegistration::getImportReference)
- .filter(ir -> ir != null && !possible.contains(ir.getImportedEndpoint())) // filter out possibles
- .forEach(this::unimportService); // unimport the non-possibles
- }
-
- /**
* Tries to import the service with each rsa until one import is successful.
*
* @param filter the filter that matched the endpoint
* @param endpoint endpoint to import
+ * @return
*/
- private void importService(String filter, EndpointDescription endpoint) {
+ private Stream<ImportRegistration> importService(EndpointDescription endpoint) {
for (RemoteServiceAdmin rsa : rsaSet) {
ImportRegistration ir = rsa.importService(endpoint);
if (ir != null) {
if (ir.getException() == null) {
LOG.debug("Service import was successful {}", ir);
- importedServices.put(filter, ir);
- return;
+ return Stream.of(ir);
} else {
LOG.info("Error importing service " + endpoint, ir.getException());
}
}
}
+ return Stream.empty();
}
private void unimportService(ImportReference ref) {