blob: 72500b988ca146d7584e57b3e3e785c0618f02f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.configuration;
import static org.apache.ignite.internal.configuration.notifications.ConfigurationNotifier.notifyListeners;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.checkConfigurationType;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.innerNodeVisitor;
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.ignite.configuration.ConfigurationTree;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.SuperRootChange;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.configuration.ConfigurationChanger.ConfigurationUpdateListener;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
import org.apache.ignite.internal.configuration.tree.ConfigurationVisitor;
import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
import org.apache.ignite.internal.configuration.tree.InnerNode;
import org.apache.ignite.internal.configuration.validation.ConfigurationValidator;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.jetbrains.annotations.Nullable;
/**
* Configuration registry.
*/
public class ConfigurationRegistry implements IgniteComponent {
/** The logger. */
private static final IgniteLogger LOG = Loggers.forClass(ConfigurationRegistry.class);
/** Generated configuration implementations. Mapping: {@link RootKey#key} -> configuration implementation. */
private final Map<String, DynamicConfiguration<?, ?>> configs = new HashMap<>();
/** Configuration change handler. */
private final ConfigurationChanger changer;
/**
* Constructor.
*
* @param rootKeys Configuration root keys.
* @param storage Configuration storage.
* @param generator Configuration tree generator.
* @param configurationValidator Configuration validator.
* @throws IllegalArgumentException If the configuration type of the root keys is not equal to the storage type, or if the schema or its
* extensions are not valid.
*/
public ConfigurationRegistry(
Collection<RootKey<?, ?>> rootKeys,
ConfigurationStorage storage,
ConfigurationTreeGenerator generator,
ConfigurationValidator configurationValidator
) {
checkConfigurationType(rootKeys, storage);
changer = new ConfigurationChanger(notificationUpdateListener(), rootKeys, storage, configurationValidator) {
@Override
public InnerNode createRootNode(RootKey<?, ?> rootKey) {
return generator.instantiateNode(rootKey.schemaClass());
}
};
rootKeys.forEach(rootKey -> {
DynamicConfiguration<?, ?> cfg = generator.instantiateCfg(rootKey, changer);
configs.put(rootKey.key(), cfg);
});
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> startAsync() {
changer.start();
return nullCompletedFuture();
}
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> stopAsync() {
changer.stop();
return nullCompletedFuture();
}
/**
* Returns a future that resolves after the defaults are persisted to the storage.
*/
public CompletableFuture<Void> onDefaultsPersisted() {
return changer.onDefaultsPersisted();
}
/**
* Initializes the configuration with the given source. This method should be used only for the initial setup of the configuration. The
* configuration is initialized with the provided source only if the storage is empty, and it is saved along with the defaults. This
* method must be called before {@link #startAsync()}.
*
* @param configurationSource the configuration source to initialize with.
*/
public void initializeConfigurationWith(ConfigurationSource configurationSource) {
changer.initializeConfigurationWith(configurationSource);
}
/**
* Gets the public configuration tree.
*
* @param rootKey Root key.
* @param <V> View type.
* @param <C> Change type.
* @param <T> Configuration tree type.
* @return Public configuration tree.
*/
public <V, C, T extends ConfigurationTree<V, C>> T getConfiguration(RootKey<T, V> rootKey) {
return (T) configs.get(rootKey.key());
}
/**
* Returns a copy of the configuration root.
*
* @return Copy of the configuration root.
*/
public SuperRoot superRoot() {
return changer.superRoot().copy();
}
/**
* Change configuration.
*
* @param changesSrc Configuration source to create patch from it.
* @return Future that is completed on change completion.
*/
public CompletableFuture<Void> change(ConfigurationSource changesSrc) {
return changer.change(changesSrc);
}
/**
* Change configuration. Gives the possibility to atomically update several root trees.
*
* @param change Closure that would consume a mutable super root instance.
* @return Future that is completed on change completion.
*/
public CompletableFuture<Void> change(Consumer<SuperRootChange> change) {
return change(new ConfigurationSource() {
@Override
public void descend(ConstructableTreeNode node) {
assert node instanceof SuperRoot : "Descending always starts with super root: " + node;
SuperRoot superRoot = (SuperRoot) node;
change.accept(new SuperRootChangeImpl(superRoot));
}
});
}
private ConfigurationUpdateListener notificationUpdateListener() {
return new ConfigurationUpdateListener() {
@Override
public CompletableFuture<Void> onConfigurationUpdated(
@Nullable SuperRoot oldSuperRoot, SuperRoot newSuperRoot, long storageRevision, long notificationNumber
) {
var futures = new ArrayList<CompletableFuture<?>>();
newSuperRoot.traverseChildren(new ConfigurationVisitor<Void>() {
@Override
public Void visitInnerNode(Field field, String key, InnerNode newRoot) {
DynamicConfiguration<InnerNode, ?> config = (DynamicConfiguration<InnerNode, ?>) configs.get(key);
assert config != null : key;
InnerNode oldRoot;
if (oldSuperRoot != null) {
oldRoot = oldSuperRoot.traverseChild(key, innerNodeVisitor(), true);
assert oldRoot != null : key;
} else {
oldRoot = null;
}
futures.addAll(notifyListeners(oldRoot, newRoot, config, storageRevision, notificationNumber));
return null;
}
}, true);
return combineFutures(futures);
}
private CompletableFuture<Void> combineFutures(Collection<CompletableFuture<?>> futures) {
if (futures.isEmpty()) {
return nullCompletedFuture();
}
CompletableFuture<?>[] resultFutures = futures.stream()
// Map futures is only for logging errors.
.map(fut -> fut.whenComplete((res, throwable) -> {
if (throwable != null) {
LOG.info("Failed to notify configuration listener", throwable);
}
}))
.toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(resultFutures);
}
};
}
/**
* Notifies all listeners of the current configuration.
*
* <p>{@link ConfigurationListener#onUpdate} and {@link ConfigurationNamedListListener#onCreate} will be called and the value will
* only be in {@link ConfigurationNotificationEvent#newValue}.
*
* @return Future that must signify when processing is completed.
*/
public CompletableFuture<Void> notifyCurrentConfigurationListeners() {
return changer.notifyCurrentConfigurationListeners();
}
/**
* Returns the count of configuration listener notifications.
*
* <p>Monotonically increasing value that should be incremented each time an attempt is made to notify all listeners of the
* configuration. Allows to guarantee that new listeners will be called only on the next notification of all configuration listeners.
*/
public long notificationCount() {
return changer.notificationCount();
}
}