ARIES-1944 - Make sure closed ImportRegistrations are removed (#40)
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
index 4f7dc72..468d235 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/ImportDiff.java
@@ -37,19 +37,28 @@
this.imported = imported;
}
- public Stream<ImportReference> getRemoved() {
+ public Stream<ImportRegistration> getRemoved() {
return imported.stream()
- .map(ImportRegistration::getImportReference)
- .filter(Objects::nonNull)
- .filter(ir -> !possible.contains(ir.getImportedEndpoint()));
+ .filter(this::toRemove);
}
-
+
public Stream<EndpointDescription> getAdded() {
Set<EndpointDescription> importedEndpoints = importedEndpoints();
return possible.stream()
.filter(not(importedEndpoints::contains));
}
+ /**
+ * Checks if the import registration is not possible anymore or closed
+ *
+ * @param ireg registration to check
+ * @return
+ */
+ private boolean toRemove(ImportRegistration ireg) {
+ ImportReference iref = ireg != null ? ireg.getImportReference() : null;
+ return iref == null || !possible.contains(iref.getImportedEndpoint());
+ }
+
private Set<EndpointDescription> importedEndpoints() {
return imported.stream()
.map(ImportRegistration::getImportReference).filter(Objects::nonNull)
diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
index 2543da9..5e4baf8 100644
--- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
+++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java
@@ -85,7 +85,7 @@
}
// close all imports
importPossibilities.clear();
- importedServices.allValues().forEach(ir -> unimportService(ir.getImportReference()));
+ importedServices.allValues().forEach(this::unimportRegistration);
}
public void add(RemoteServiceAdmin rsa) {
@@ -99,8 +99,11 @@
@Override
public void remoteAdminEvent(RemoteServiceAdminEvent event) {
- if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION) {
- unimportService(event.getImportReference());
+ ImportReference ref = event.getImportReference();
+ if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION && ref != null) {
+ importedServices.allValues().stream()
+ .filter(ir -> ref.equals(ir.getImportReference()))
+ .forEach(this::unimportRegistration);
}
}
@@ -125,7 +128,7 @@
try {
ImportDiff diff = new ImportDiff(importPossibilities.get(filter), importedServices.get(filter));
diff.getRemoved()
- .forEach(this::unimportService);
+ .forEach(this::unimportRegistration);
diff.getAdded()
.flatMap(this::importService)
.forEach(ir -> importedServices.put(filter, ir));
@@ -156,16 +159,12 @@
}
return Stream.empty();
}
-
- private void unimportService(ImportReference ref) {
- importedServices.allValues().stream()
- .filter(ir -> ref != null && ref.equals(ir.getImportReference()))
- .forEach(ir -> {
- importedServices.remove(ir);
- ir.close();
- });
+
+ private void unimportRegistration(ImportRegistration reg) {
+ importedServices.remove(reg);
+ reg.close();
}
-
+
@Override
public void endpointChanged(EndpointEvent event, String filter) {
if (stopped) {