IGNITE-15297 Removed spin-wait loop in ConfigurationChanger (#275)

diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index 96eb0a4..153da64 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -45,6 +45,7 @@
 import org.apache.ignite.internal.configuration.tree.InnerNode;
 import org.apache.ignite.internal.configuration.validation.MemberKey;
 import org.apache.ignite.internal.configuration.validation.ValidationUtil;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -102,6 +103,9 @@
         /** Version associated with the currently known storage state. */
         private final long version;
 
+        /** Future that signifies update of current configuration. */
+        private final CompletableFuture<Void> changeFuture = new CompletableFuture<>();
+
         /** */
         private StorageRoots(SuperRoot roots, long version) {
             this.roots = roots;
@@ -317,6 +321,9 @@
     /** Stop component. */
     public void stop() {
         pool.shutdownNow();
+
+        for (StorageRoots storageRoots : storagesRootsMap.values())
+            storageRoots.changeFuture.completeExceptionally(new NodeStoppingException());
     }
 
     /**
@@ -338,6 +345,7 @@
 
     /**
      * Internal configuration change method that completes provided future.
+     *
      * @param src Configuration source.
      * @param storage Storage instance.
      * @return fut Future that will be completed after changes are written to the storage.
@@ -383,27 +391,18 @@
             }, pool)
             .thenCompose(allChanges -> {
                 if (allChanges == null)
-                    return completedFuture(true);
+                    return completedFuture(null);
+
                 return storage.write(allChanges, storageRoots.version)
+                    .thenCompose(casResult -> {
+                        if (casResult)
+                            return storageRoots.changeFuture;
+                        else
+                            return storageRoots.changeFuture.thenCompose(v -> changeInternally(src, storage));
+                    })
                     .exceptionally(throwable -> {
                         throw new ConfigurationChangeException("Failed to change configuration", throwable);
                     });
-            })
-            .thenCompose(casResult -> {
-                if (casResult)
-                    return CompletableFuture.completedFuture(null);
-                else {
-                    try {
-                        // Is this ok to have a busy wait on concurrent configuration updates?
-                        // Maybe we'll fix it while implementing metastorage storage implementation.
-                        Thread.sleep(10);
-                    }
-                    catch (InterruptedException e) {
-                        return CompletableFuture.failedFuture(e);
-                    }
-
-                    return changeInternally(src, storage);
-                }
             });
     }
 
@@ -435,11 +434,13 @@
 
         storagesRootsMap.put(storageType, newStorageRoots);
 
-        return notificator.notify(
-            oldSuperRoot,
-            newSuperRoot,
-            newChangeId
-        );
+        return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId)
+            .whenComplete((v, t) -> {
+                if (t == null)
+                    oldStorageRoots.changeFuture.complete(null);
+                else
+                    oldStorageRoots.changeFuture.completeExceptionally(t);
+            });
     }
 
     /**
@@ -450,16 +451,10 @@
      */
     private void compressDeletedEntries(Map<String, ?> prefixMap) {
         // Here we basically assume that if prefix subtree contains single null child then all its childrens are nulls.
-        Set<String> keysForRemoval = prefixMap.entrySet().stream()
-            .filter(entry ->
-                entry.getValue() instanceof Map && ((Map<?, ?>)entry.getValue()).containsValue(null)
-            )
-            .map(Map.Entry::getKey)
-            .collect(Collectors.toSet());
-
         // Replace all such elements will nulls, signifying that these are deleted named list elements.
-        for (String key : keysForRemoval)
-            prefixMap.put(key, null);
+        prefixMap.replaceAll((key, value) ->
+            value instanceof Map && ((Map<?, ?>)value).containsValue(null) ? null : value
+        );
 
         // Continue recursively.
         for (Object value : prefixMap.values()) {