blob: 09223bfda00c9acf3054bc5a3e958f9addd333b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.configuration;
import java.io.Serializable;
import java.lang.annotation.Annotation;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.configuration.ConfigurationTree;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.annotation.Config;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.InternalConfiguration;
import org.apache.ignite.configuration.validation.Immutable;
import org.apache.ignite.configuration.validation.Max;
import org.apache.ignite.configuration.validation.Min;
import org.apache.ignite.configuration.validation.Validator;
import org.apache.ignite.internal.configuration.asm.ConfigurationAsmGenerator;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
import org.apache.ignite.internal.configuration.tree.ConfigurationVisitor;
import org.apache.ignite.internal.configuration.tree.InnerNode;
import org.apache.ignite.internal.configuration.tree.TraversableTreeNode;
import org.apache.ignite.internal.configuration.util.ConfigurationNotificationsUtil;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.configuration.util.KeyNotFoundException;
import org.apache.ignite.internal.configuration.validation.ImmutableValidator;
import org.apache.ignite.internal.configuration.validation.MaxValidator;
import org.apache.ignite.internal.configuration.validation.MinValidator;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.lang.IgniteLogger;
import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.configuration.util.ConfigurationNotificationsUtil.notifyListeners;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.checkConfigurationType;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.collectSchemas;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.innerNodeVisitor;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.internalSchemaExtensions;
/** */
public class ConfigurationRegistry implements IgniteComponent {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(ConfigurationRegistry.class);
/** Generated configuration implementations. Mapping: {@link RootKey#key} -> configuration implementation. */
private final Map<String, DynamicConfiguration<?, ?>> configs = new HashMap<>();
/** Root keys. */
private final Collection<RootKey<?, ?>> rootKeys;
/** Configuration change handler. */
private final ConfigurationChanger changer;
/** Configuration generator. */
private final ConfigurationAsmGenerator cgen = new ConfigurationAsmGenerator();
/**
* Constructor.
*
* @param rootKeys Configuration root keys.
* @param validators Validators.
* @param storage Configuration storage.
* @param internalSchemaExtensions Internal extensions ({@link InternalConfiguration})
* of configuration schemas ({@link ConfigurationRoot} and {@link Config}).
* @throws IllegalArgumentException If the configuration type of the root keys is not equal to the storage type,
* or if the schema or its extensions are not valid.
*/
public ConfigurationRegistry(
Collection<RootKey<?, ?>> rootKeys,
Map<Class<? extends Annotation>, Set<Validator<? extends Annotation, ?>>> validators,
ConfigurationStorage storage,
Collection<Class<?>> internalSchemaExtensions
) {
checkConfigurationType(rootKeys, storage);
Set<Class<?>> allSchemas = collectSchemas(rootKeys.stream().map(RootKey::schemaClass).collect(toSet()));
Map<Class<?>, Set<Class<?>>> extensions = internalSchemaExtensions(internalSchemaExtensions);
if (!allSchemas.containsAll(extensions.keySet())) {
Set<Class<?>> notInAllSchemas = extensions.keySet().stream()
.filter(not(allSchemas::contains))
.collect(toSet());
throw new IllegalArgumentException(
"Internal extensions for which no parent configuration schemes were found: " + notInAllSchemas
);
}
this.rootKeys = rootKeys;
Map<Class<? extends Annotation>, Set<Validator<?, ?>>> validators0 = new HashMap<>(validators);
validators0.computeIfAbsent(Min.class, a -> new HashSet<>()).add(new MinValidator());
validators0.computeIfAbsent(Max.class, a -> new HashSet<>()).add(new MaxValidator());
validators0.computeIfAbsent(Immutable.class, a -> new HashSet<>()).add(new ImmutableValidator());
changer = new ConfigurationChanger(this::notificator, rootKeys, validators0, storage) {
/** {@inheritDoc} */
@Override public InnerNode createRootNode(RootKey<?, ?> rootKey) {
return cgen.instantiateNode(rootKey.schemaClass());
}
};
rootKeys.forEach(rootKey -> {
cgen.compileRootSchema(rootKey.schemaClass(), extensions);
DynamicConfiguration<?, ?> cfg = cgen.instantiateCfg(rootKey, changer);
configs.put(rootKey.key(), cfg);
});
}
/** {@inheritDoc} */
@Override public void start() {
changer.start();
}
/** {@inheritDoc} */
@Override public void stop() {
changer.stop();
}
/**
* Initializes the configuration storage - reads data and sets default values for missing configuration properties.
*/
public void initializeDefaults() {
changer.initializeDefaults();
for (RootKey<?, ?> rootKey : rootKeys) {
DynamicConfiguration<?, ?> dynCfg = configs.get(rootKey.key());
ConfigurationNotificationsUtil.touch(dynCfg);
}
}
/**
* Gets the public configuration tree.
*
* @param rootKey Root key.
* @param <V> View type.
* @param <C> Change type.
* @param <T> Configuration tree type.
* @return Public configuration tree.
*/
public <V, C, T extends ConfigurationTree<V, C>> T getConfiguration(RootKey<T, V> rootKey) {
return (T)configs.get(rootKey.key());
}
/**
* Convert configuration subtree into a user-defined representation.
*
* @param path Path to configuration subtree. Can be empty, can't be {@code null}.
* @param visitor Visitor that will be applied to the subtree and build the representation.
* @param <T> Type of the representation.
* @return User-defined representation constructed by {@code visitor}.
* @throws IllegalArgumentException If {@code path} is not found in current configuration.
*/
public <T> T represent(List<String> path, ConfigurationVisitor<T> visitor) throws IllegalArgumentException {
SuperRoot superRoot = changer.superRoot();
Object node;
try {
node = ConfigurationUtil.find(path, superRoot, false);
}
catch (KeyNotFoundException e) {
throw new IllegalArgumentException(e.getMessage());
}
if (node instanceof TraversableTreeNode)
return ((TraversableTreeNode)node).accept(null, visitor);
assert node == null || node instanceof Serializable;
return visitor.visitLeafNode(null, (Serializable)node);
}
/**
* Change configuration.
*
* @param changesSrc Configuration source to create patch from it.
* @return Future that is completed on change completion.
*/
public CompletableFuture<Void> change(ConfigurationSource changesSrc) {
return changer.change(changesSrc);
}
/**
* Configuration change notifier.
*
* @param oldSuperRoot Old roots values. All these roots always belong to a single storage.
* @param newSuperRoot New values for the same roots as in {@code oldRoot}.
* @param storageRevision Revision of the storage.
* @return Future that must signify when processing is completed.
*/
private CompletableFuture<Void> notificator(SuperRoot oldSuperRoot, SuperRoot newSuperRoot, long storageRevision) {
List<CompletableFuture<?>> futures = new ArrayList<>();
newSuperRoot.traverseChildren(new ConfigurationVisitor<Void>() {
/** {@inheritDoc} */
@Override public Void visitInnerNode(String key, InnerNode newRoot) {
InnerNode oldRoot = oldSuperRoot.traverseChild(key, innerNodeVisitor(), true);
var cfg = (DynamicConfiguration<InnerNode, ?>)configs.get(key);
assert oldRoot != null && cfg != null : key;
if (oldRoot != newRoot)
notifyListeners(oldRoot, newRoot, cfg, storageRevision, futures);
return null;
}
}, true);
// Map futures is only for logging errors.
Function<CompletableFuture<?>, CompletableFuture<?>> mapping = fut -> fut.whenComplete((res, throwable) -> {
if (throwable != null)
LOG.error("Failed to notify configuration listener.", throwable);
});
CompletableFuture<?>[] resultFutures = futures.stream().map(mapping).toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(resultFutures);
}
}