IGNITE-15297 Removed spin-wait loop in ConfigurationChanger (#275)
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 96eb0a4..153da64 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -45,6 +45,7 @@
import org.apache.ignite.internal.configuration.tree.InnerNode;
import org.apache.ignite.internal.configuration.validation.MemberKey;
import org.apache.ignite.internal.configuration.validation.ValidationUtil;
+import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -102,6 +103,9 @@
/** Version associated with the currently known storage state. */
private final long version;
+ /** Future that signifies update of current configuration. */
+ private final CompletableFuture<Void> changeFuture = new CompletableFuture<>();
+
/** */
private StorageRoots(SuperRoot roots, long version) {
this.roots = roots;
@@ -317,6 +321,9 @@
/** Stop component. */
public void stop() {
pool.shutdownNow();
+
+ for (StorageRoots storageRoots : storagesRootsMap.values())
+ storageRoots.changeFuture.completeExceptionally(new NodeStoppingException());
}
/**
@@ -338,6 +345,7 @@
/**
* Internal configuration change method that completes provided future.
+ *
* @param src Configuration source.
* @param storage Storage instance.
* @return fut Future that will be completed after changes are written to the storage.
@@ -383,27 +391,18 @@
}, pool)
.thenCompose(allChanges -> {
if (allChanges == null)
- return completedFuture(true);
+ return completedFuture(null);
+
return storage.write(allChanges, storageRoots.version)
+ .thenCompose(casResult -> {
+ if (casResult)
+ return storageRoots.changeFuture;
+ else
+ return storageRoots.changeFuture.thenCompose(v -> changeInternally(src, storage));
+ })
.exceptionally(throwable -> {
throw new ConfigurationChangeException("Failed to change configuration", throwable);
});
- })
- .thenCompose(casResult -> {
- if (casResult)
- return CompletableFuture.completedFuture(null);
- else {
- try {
- // Is this ok to have a busy wait on concurrent configuration updates?
- // Maybe we'll fix it while implementing metastorage storage implementation.
- Thread.sleep(10);
- }
- catch (InterruptedException e) {
- return CompletableFuture.failedFuture(e);
- }
-
- return changeInternally(src, storage);
- }
});
}
@@ -435,11 +434,13 @@
storagesRootsMap.put(storageType, newStorageRoots);
- return notificator.notify(
- oldSuperRoot,
- newSuperRoot,
- newChangeId
- );
+ return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId)
+ .whenComplete((v, t) -> {
+ if (t == null)
+ oldStorageRoots.changeFuture.complete(null);
+ else
+ oldStorageRoots.changeFuture.completeExceptionally(t);
+ });
}
/**
@@ -450,16 +451,10 @@
*/
private void compressDeletedEntries(Map<String, ?> prefixMap) {
// Here we basically assume that if prefix subtree contains single null child then all its childrens are nulls.
- Set<String> keysForRemoval = prefixMap.entrySet().stream()
- .filter(entry ->
- entry.getValue() instanceof Map && ((Map<?, ?>)entry.getValue()).containsValue(null)
- )
- .map(Map.Entry::getKey)
- .collect(Collectors.toSet());
-
// Replace all such elements will nulls, signifying that these are deleted named list elements.
- for (String key : keysForRemoval)
- prefixMap.put(key, null);
+ prefixMap.replaceAll((key, value) ->
+ value instanceof Map && ((Map<?, ?>)value).containsValue(null) ? null : value
+ );
// Continue recursively.
for (Object value : prefixMap.values()) {