IGNITE-15501 Improvements in ConfigurationExtension - support for listeners and internal schema extensions (#334)

diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/BasicConfigurationSchema.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/BasicConfigurationSchema.java
new file mode 100644
index 0000000..0503f08
--- /dev/null
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/BasicConfigurationSchema.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.testframework;
+
+import org.apache.ignite.configuration.annotation.Config;
+import org.apache.ignite.configuration.annotation.Value;
+
+/**
+ * Configuration schema for {@link ConfigurationExtensionTest#notifications()}.
+ */
+@Config
+public class BasicConfigurationSchema {
+    /** */
+    @Value(hasDefault = true)
+    public int visible = 1;
+}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
index 6fe1d3c..b84c0be 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
@@ -19,13 +19,17 @@
 
 import java.lang.reflect.Field;
 import java.lang.reflect.Parameter;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigObject;
 import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.internal.configuration.DynamicConfiguration;
 import org.apache.ignite.internal.configuration.DynamicConfigurationChanger;
 import org.apache.ignite.internal.configuration.RootInnerNode;
 import org.apache.ignite.internal.configuration.SuperRoot;
@@ -33,8 +37,11 @@
 import org.apache.ignite.internal.configuration.hocon.HoconConverter;
 import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
 import org.apache.ignite.internal.configuration.tree.InnerNode;
+import org.apache.ignite.internal.configuration.util.ConfigurationNotificationsUtil;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
+import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
 import org.junit.jupiter.api.extension.BeforeEachCallback;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.api.extension.ExtensionContext.Namespace;
@@ -44,8 +51,9 @@
 import org.junit.platform.commons.support.AnnotationSupport;
 import org.junit.platform.commons.support.HierarchyTraversalMode;
 
-import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
+import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.internalSchemaExtensions;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -54,31 +62,51 @@
  *
  * @see InjectConfiguration
  */
-public class ConfigurationExtension implements BeforeEachCallback, AfterEachCallback, ParameterResolver {
+public class ConfigurationExtension implements BeforeEachCallback, AfterEachCallback,
+    BeforeAllCallback, AfterAllCallback, ParameterResolver {
     /** JUnit namespace for the extension. */
-    private static final Namespace namespace = Namespace.create(ConfigurationExtension.class);
+    private static final Namespace NAMESPACE = Namespace.create(ConfigurationExtension.class);
 
     /** Key to store {@link ConfigurationAsmGenerator} in {@link ExtensionContext.Store}. */
     private static final Object CGEN_KEY = new Object();
 
+    /** Key to store {@link ExecutorService} in {@link ExtensionContext.Store}. */
+    private static final Object POOL_KEY = new Object();
+
+    /** {@inheritDoc} */
+    @Override public void beforeAll(ExtensionContext context) throws Exception {
+        context.getStore(NAMESPACE).put(POOL_KEY, newSingleThreadExecutor());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterAll(ExtensionContext context) throws Exception {
+        ExecutorService pool = context.getStore(NAMESPACE).remove(POOL_KEY, ExecutorService.class);
+
+        pool.shutdownNow();
+    }
+
     /** {@inheritDoc} */
     @Override public void beforeEach(ExtensionContext context) throws Exception {
         ConfigurationAsmGenerator cgen = new ConfigurationAsmGenerator();
 
-        context.getStore(namespace).put(CGEN_KEY, cgen);
+        context.getStore(NAMESPACE).put(CGEN_KEY, cgen);
 
         Object testInstance = context.getRequiredTestInstance();
 
+        ExecutorService pool = context.getStore(NAMESPACE).get(POOL_KEY, ExecutorService.class);
+
         for (Field field : getMatchingFields(testInstance.getClass())) {
             field.setAccessible(true);
 
-            field.set(testInstance, cfgValue(field.getType(), field.getAnnotation(InjectConfiguration.class), cgen));
+            InjectConfiguration annotation = field.getAnnotation(InjectConfiguration.class);
+
+            field.set(testInstance, cfgValue(field.getType(), annotation, cgen, pool));
         }
     }
 
     /** {@inheritDoc} */
     @Override public void afterEach(ExtensionContext context) throws Exception {
-        context.getStore(namespace).remove(CGEN_KEY);
+        context.getStore(NAMESPACE).remove(CGEN_KEY);
     }
 
     /** {@inheritDoc} */
@@ -91,15 +119,18 @@
 
     /** {@inheritDoc} */
     @Override public Object resolveParameter(
-        ParameterContext parameterContext, ExtensionContext extensionContext
+        ParameterContext parameterContext,
+        ExtensionContext extensionContext
     ) throws ParameterResolutionException {
         Parameter parameter = parameterContext.getParameter();
 
         ConfigurationAsmGenerator cgen =
-            extensionContext.getStore(namespace).get(CGEN_KEY, ConfigurationAsmGenerator.class);
+            extensionContext.getStore(NAMESPACE).get(CGEN_KEY, ConfigurationAsmGenerator.class);
 
         try {
-            return cfgValue(parameter.getType(), parameter.getAnnotation(InjectConfiguration.class), cgen);
+            ExecutorService pool = extensionContext.getStore(NAMESPACE).get(POOL_KEY, ExecutorService.class);
+
+            return cfgValue(parameter.getType(), parameter.getAnnotation(InjectConfiguration.class), cgen, pool);
         }
         catch (ClassNotFoundException classNotFoundException) {
             throw new ParameterResolutionException(
@@ -115,6 +146,7 @@
      * @param type Type of the field or parameter. Class name must end with {@code Configuration}.
      * @param annotation Annotation present on the field or parameter.
      * @param cgen Runtime code generator associated with the extension instance.
+     * @param pool Single-threaded executor service to perform configuration changes.
      * @return Mock configuration instance.
      * @throws ClassNotFoundException If corresponding configuration schema class is not found.
      * @see #supportType(Class)
@@ -122,14 +154,14 @@
     private static Object cfgValue(
         Class<?> type,
         InjectConfiguration annotation,
-        ConfigurationAsmGenerator cgen
+        ConfigurationAsmGenerator cgen,
+        ExecutorService pool
     ) throws ClassNotFoundException {
         // Trying to find a schema class using configuration naming convention. This code won't work for inner Java
         // classes, extension is designed to mock actual configurations from public API to configure Ignite components.
         Class<?> schemaClass = Class.forName(type.getCanonicalName() + "Schema");
 
-        // Internal configuration extensions are not yet supported. This will probably be changed in the future.
-        cgen.compileRootSchema(schemaClass, Map.of());
+        cgen.compileRootSchema(schemaClass, internalSchemaExtensions(List.of(annotation.extensions())));
 
         // RootKey must be mocked, there's no way to instantiate it using a public constructor.
         RootKey rootKey = mock(RootKey.class);
@@ -150,26 +182,48 @@
         // Reference to the super root is required to make DynamicConfigurationChanger#change method atomic.
         var superRootRef = new AtomicReference<>(superRoot);
 
-        return cgen.instantiateCfg(rootKey, new DynamicConfigurationChanger() {
+        // Reference that's required for notificator.
+        AtomicReference<DynamicConfiguration<?, ?>> cfgRef = new AtomicReference();
+
+        cfgRef.set(cgen.instantiateCfg(rootKey, new DynamicConfigurationChanger() {
+            private AtomicInteger storageRev = new AtomicInteger();
+
             /** {@inheritDoc} */
             @Override public CompletableFuture<Void> change(ConfigurationSource change) {
-                while (true) {
+                return CompletableFuture.supplyAsync(() -> {
                     SuperRoot sr = superRootRef.get();
 
                     SuperRoot copy = sr.copy();
 
                     change.descend(copy);
 
-                    if (superRootRef.compareAndSet(sr, copy))
-                        return completedFuture(null);
-                }
+                    if (superRootRef.compareAndSet(sr, copy)) {
+                        List<CompletableFuture<?>> futures = new ArrayList<>();
+
+                        ConfigurationNotificationsUtil.notifyListeners(
+                            sr.getRoot(rootKey),
+                            copy.getRoot(rootKey),
+                            (DynamicConfiguration<InnerNode, ?>)cfgRef.get(),
+                            storageRev.incrementAndGet(),
+                            futures
+                        );
+
+                        return CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
+                    }
+
+                    return change(change);
+                }, pool).thenCompose(Function.identity());
             }
 
             /** {@inheritDoc} */
             @Override public InnerNode getRootNode(RootKey<?, ?> rk) {
                 return superRootRef.get().getRoot(rk);
             }
-        });
+        }));
+
+        ConfigurationNotificationsUtil.touch(cfgRef.get());
+
+        return cfgRef.get();
     }
 
     /**
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtensionTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtensionTest.java
index 922b6f8..b29c68a 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtensionTest.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtensionTest.java
@@ -17,11 +17,17 @@
 
 package org.apache.ignite.internal.configuration.testframework;
 
-import java.util.concurrent.ExecutionException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.configuration.sample.DiscoveryConfiguration;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -37,17 +43,75 @@
     @Test
     public void injectConfiguration(
         @InjectConfiguration("mock.joinTimeout=100") DiscoveryConfiguration paramCfg
-    ) throws ExecutionException, InterruptedException {
+    ) throws Exception {
         assertEquals(5000, fieldCfg.joinTimeout().value());
 
         assertEquals(100, paramCfg.joinTimeout().value());
 
-        paramCfg.change(d -> d.changeJoinTimeout(200));
+        paramCfg.change(d -> d.changeJoinTimeout(200)).get(1, TimeUnit.SECONDS);
 
         assertEquals(200, paramCfg.joinTimeout().value());
 
-        paramCfg.joinTimeout().update(300);
+        paramCfg.joinTimeout().update(300).get(1, TimeUnit.SECONDS);
 
         assertEquals(300, paramCfg.joinTimeout().value());
     }
+
+    /** Tests that notifications work on injected configuration instance. */
+    @Test
+    public void notifications() throws Exception {
+        List<String> log = new ArrayList<>();
+
+        fieldCfg.listen(ctx -> {
+            log.add("update");
+
+            return completedFuture(null);
+        });
+
+        fieldCfg.joinTimeout().listen(ctx -> {
+            log.add("join");
+
+            return completedFuture(null);
+        });
+
+        fieldCfg.failureDetectionTimeout().listen(ctx -> {
+            log.add("failure");
+
+            return completedFuture(null);
+        });
+
+        fieldCfg.change(change -> change.changeJoinTimeout(1000_000)).get(1, TimeUnit.SECONDS);
+
+        assertEquals(List.of("update", "join"), log);
+
+        log.clear();
+
+        fieldCfg.failureDetectionTimeout().update(2000_000).get(1, TimeUnit.SECONDS);
+
+        assertEquals(List.of("update", "failure"), log);
+    }
+
+    /** Tests that internal configuration extensions work properly on injected configuration instance. */
+    @Test
+    public void internalConfiguration(
+        @InjectConfiguration(extensions = {ExtendedConfigurationSchema.class}) BasicConfiguration cfg
+    ) throws Exception {
+        assertThat(cfg, is(instanceOf(ExtendedConfiguration.class)));
+
+        assertEquals(1, cfg.visible().value());
+
+        assertEquals(2, ((ExtendedConfiguration)cfg).invisible().value());
+
+        cfg.change(change -> {
+            assertThat(change, is(instanceOf(ExtendedChange.class)));
+
+            change.changeVisible(3);
+
+            ((ExtendedChange)change).changeInvisible(4);
+        }).get(1, TimeUnit.SECONDS);
+
+        assertEquals(3, cfg.visible().value());
+
+        assertEquals(4, ((ExtendedConfiguration)cfg).invisible().value());
+    }
 }
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ExtendedConfigurationSchema.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ExtendedConfigurationSchema.java
new file mode 100644
index 0000000..dbfe30a
--- /dev/null
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ExtendedConfigurationSchema.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.testframework;
+
+import org.apache.ignite.configuration.annotation.InternalConfiguration;
+import org.apache.ignite.configuration.annotation.Value;
+
+/**
+ * Configuration schema for {@link ConfigurationExtensionTest#notifications()}.
+ */
+@InternalConfiguration
+public class ExtendedConfigurationSchema extends BasicConfigurationSchema {
+    /** */
+    @Value(hasDefault = true)
+    public int invisible = 2;
+}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/InjectConfiguration.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/InjectConfiguration.java
index d339d1e..6409bfb 100644
--- a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/InjectConfiguration.java
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/InjectConfiguration.java
@@ -21,6 +21,7 @@
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import org.apache.ignite.configuration.annotation.InternalConfiguration;
 import org.apache.ignite.internal.configuration.ConfigurationChanger;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
 
@@ -33,7 +34,7 @@
  * values are not found.
  * <p/>
  * Although configuration instance is mutable, there's no {@link ConfigurationRegistry} and {@link ConfigurationChanger}
- * underneath. Listeners don't work either, be aware of that. Main point of the extension is to provide mocks.
+ * underneath. Main point of the extension is to provide mocks.
  *
  * @see ConfigurationExtension
  */
@@ -54,4 +55,12 @@
      * @return Initial configuration values in HOCON format.
      */
     String value() default "mock : {}";
+
+    /**
+     * Array of configuration schema extensions. Every class in the array must be annotated with
+     * {@link InternalConfiguration} and extend some public configuration.
+     *
+     * @return Array of configuration schema extensions.
+     */
+    Class<?>[] extensions() default {};
 }