blob: 153da642be1b8e66151f658a8017139764f65c06 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.configuration;
import java.io.Serializable;
import java.lang.annotation.Annotation;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.annotation.ConfigurationType;
import org.apache.ignite.configuration.validation.ConfigurationValidationException;
import org.apache.ignite.configuration.validation.ValidationIssue;
import org.apache.ignite.configuration.validation.Validator;
import org.apache.ignite.internal.configuration.storage.ConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.Data;
import org.apache.ignite.internal.configuration.storage.StorageException;
import org.apache.ignite.internal.configuration.tree.ConfigurationSource;
import org.apache.ignite.internal.configuration.tree.ConstructableTreeNode;
import org.apache.ignite.internal.configuration.tree.InnerNode;
import org.apache.ignite.internal.configuration.validation.MemberKey;
import org.apache.ignite.internal.configuration.validation.ValidationUtil;
import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.configuration.util.ConfigurationFlattener.createFlattenedUpdatesMap;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.addDefaults;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.dropNulls;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.fillFromPrefixMap;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.toPrefixMap;
/**
* Class that handles configuration changes, by validating them, passing to storage and listening to storage updates.
*/
public abstract class ConfigurationChanger {
/** */
private final ForkJoinPool pool = new ForkJoinPool(2);
/** */
private final Map<String, RootKey<?, ?>> rootKeys = new TreeMap<>();
/** Map that has all the trees in accordance to their storages. */
private final Map<ConfigurationType, StorageRoots> storagesRootsMap = new ConcurrentHashMap<>();
/** Annotation classes mapped to validator objects. */
private Map<Class<? extends Annotation>, Set<Validator<?, ?>>> validators = new HashMap<>();
/**
* Closure interface to be used by the configuration changer. An instance of this closure is passed into the constructor and
* invoked every time when there's an update from any of the storages.
*/
@FunctionalInterface
public interface Notificator {
/**
* Invoked every time when the configuration is updated.
* @param oldRoot Old roots values. All these roots always belong to a single storage.
* @param newRoot New values for the same roots as in {@code oldRoot}.
* @param storageRevision Revision of the storage.
* @return Not-null future that must signify when processing is completed. Exceptional completion is not
* expected.
*/
@NotNull CompletableFuture<Void> notify(SuperRoot oldRoot, SuperRoot newRoot, long storageRevision);
}
/** Closure to execute when an update from the storage is received. */
private final Notificator notificator;
/**
* Immutable data container to store version and all roots associated with the specific storage.
*/
private static class StorageRoots {
/** Immutable forest, so to say. */
private final SuperRoot roots;
/** Version associated with the currently known storage state. */
private final long version;
/** Future that signifies update of current configuration. */
private final CompletableFuture<Void> changeFuture = new CompletableFuture<>();
/** */
private StorageRoots(SuperRoot roots, long version) {
this.roots = roots;
this.version = version;
}
}
/** Lazy annotations cache for configuration schema fields. */
private final Map<MemberKey, Annotation[]> cachedAnnotations = new ConcurrentHashMap<>();
/** Storage instances by their classes. Comes in handy when all you have is {@link RootKey}. */
private final Map<ConfigurationType, ConfigurationStorage> storageInstances = new HashMap<>();
/**
* @param notificator Closure to execute when update from the storage is received.
*/
public ConfigurationChanger(Notificator notificator) {
this.notificator = notificator;
}
/**
* Adds a single validator instance.
* @param annotationType Annotation type for validated fields.
* @param validator Validator instance for this annotation.
* @param <A> Annotation type.
*/
public <A extends Annotation> void addValidator(Class<A> annotationType, Validator<A, ?> validator) {
validators
.computeIfAbsent(annotationType, a -> new HashSet<>())
.add(validator);
}
/**
* Adds multiple validators instances.
* @param annotationType Annotation type for validated fields.
* @param validators Set of validator instancec for this annotation.
*/
public void addValidators(Class<? extends Annotation> annotationType, Set<Validator<? extends Annotation, ?>> validators) {
this.validators
.computeIfAbsent(annotationType, a -> new HashSet<>())
.addAll(validators);
}
/**
* Registers an additional root key.
* @param rootKey Root key instance.
*/
public void addRootKey(RootKey<?, ?> rootKey) {
assert !storageInstances.containsKey(rootKey.type());
rootKeys.put(rootKey.key(), rootKey);
}
/**
* Created new {@code Node} object that corresponds to passed root keys root configuration node.
* @param rootKey Root key.
* @return New {@link InnerNode} instance that represents root.
*/
public abstract InnerNode createRootNode(RootKey<?, ?> rootKey);
/**
* Utility method to create {@link SuperRoot} parameter value.
* @return Function that creates root node by root name or returns {@code null} if root name is not found.
*/
@NotNull private Function<String, InnerNode> rootCreator() {
return key -> {
RootKey<?, ?> rootKey = rootKeys.get(key);
return rootKey == null ? null : createRootNode(rootKey);
};
}
/**
* Registers a storage.
* @param configurationStorage Configuration storage instance.
*/
// ConfigurationChangeException, really?
public void register(ConfigurationStorage configurationStorage) throws ConfigurationChangeException {
storageInstances.put(configurationStorage.type(), configurationStorage);
Set<RootKey<?, ?>> storageRootKeys = rootKeys.values().stream().filter(
rootKey -> configurationStorage.type() == rootKey.type()
).collect(Collectors.toSet());
Data data;
try {
data = configurationStorage.readAll();
}
catch (StorageException e) {
throw new ConfigurationChangeException("Failed to initialize configuration: " + e.getMessage(), e);
}
SuperRoot superRoot = new SuperRoot(rootCreator());
Map<String, ?> dataValuesPrefixMap = toPrefixMap(data.values());
for (RootKey<?, ?> rootKey : storageRootKeys) {
Map<String, ?> rootPrefixMap = (Map<String, ?>)dataValuesPrefixMap.get(rootKey.key());
InnerNode rootNode = createRootNode(rootKey);
if (rootPrefixMap != null)
fillFromPrefixMap(rootNode, rootPrefixMap);
superRoot.addRoot(rootKey, rootNode);
}
StorageRoots storageRoots = new StorageRoots(superRoot, data.changeId());
storagesRootsMap.put(configurationStorage.type(), storageRoots);
configurationStorage.registerConfigurationListener(changedEntries -> updateFromListener(
configurationStorage.type(),
changedEntries
));
}
/**
* Initializes the configuration storage - reads data and sets default values for missing configuration properties.
* @param storageType Storage type.
* @throws ConfigurationValidationException If configuration validation failed.
* @throws ConfigurationChangeException If configuration framework failed to add default values and save them to
* storage.
*/
public void initialize(ConfigurationType storageType) throws ConfigurationValidationException, ConfigurationChangeException {
ConfigurationStorage configurationStorage = storageInstances.get(storageType);
assert configurationStorage != null : storageType;
try {
ConfigurationSource defaultsCfgSource = new ConfigurationSource() {
@Override public void descend(ConstructableTreeNode node) {
addDefaults((InnerNode)node);
}
};
changeInternally(defaultsCfgSource, configurationStorage).get();
}
catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof ConfigurationValidationException)
throw (ConfigurationValidationException)cause;
if (cause instanceof ConfigurationChangeException)
throw (ConfigurationChangeException)cause;
throw new ConfigurationChangeException(
"Failed to write defalut configuration values into the storage " + configurationStorage.getClass(), e
);
}
catch (InterruptedException e) {
throw new ConfigurationChangeException(
"Failed to initialize configuration storage " + configurationStorage.getClass(), e
);
}
}
/**
* Changes the configuration.
* @param source Configuration source to create patch from.
* @param storage Expected storage for the changes. If null, a derived storage will be used
* unconditionaly.
* @return Future that is completed on change completion.
*/
public CompletableFuture<Void> change(
ConfigurationSource source,
@Nullable ConfigurationStorage storage
) {
Set<ConfigurationType> storagesTypes = new HashSet<>();
ConstructableTreeNode collector = new ConstructableTreeNode() {
@Override public void construct(String key, ConfigurationSource src) throws NoSuchElementException {
RootKey<?, ?> rootKey = rootKeys.get(key);
if (rootKey == null)
throw new NoSuchElementException(key);
storagesTypes.add(rootKey.type());
}
@Override public ConstructableTreeNode copy() {
throw new UnsupportedOperationException("copy");
}
};
source.reset();
source.descend(collector);
assert !storagesTypes.isEmpty();
if (storagesTypes.size() != 1) {
return CompletableFuture.failedFuture(
new ConfigurationChangeException(
"Cannot handle change request with configuration patches belonging to different storages."
)
);
}
ConfigurationStorage actualStorage = storageInstances.get(storagesTypes.iterator().next());
if (storage != null && storage != actualStorage) {
return CompletableFuture.failedFuture(
new ConfigurationChangeException("Mismatched storage passed.")
);
}
return changeInternally(source, actualStorage);
}
/** Stop component. */
public void stop() {
pool.shutdownNow();
for (StorageRoots storageRoots : storagesRootsMap.values())
storageRoots.changeFuture.completeExceptionally(new NodeStoppingException());
}
/**
* Get root node by root key. Subject to revisiting.
*
* @param rootKey Root key.
* @return Root node.
*/
public InnerNode getRootNode(RootKey<?, ?> rootKey) {
return storagesRootsMap.get(rootKey.type()).roots.getRoot(rootKey);
}
/**
* @return Super root chat contains roots belonging to all storages.
*/
public SuperRoot mergedSuperRoot() {
return new SuperRoot(rootCreator(), storagesRootsMap.values().stream().map(roots -> roots.roots).collect(toList()));
}
/**
* Internal configuration change method that completes provided future.
*
* @param src Configuration source.
* @param storage Storage instance.
* @return fut Future that will be completed after changes are written to the storage.
*/
private CompletableFuture<Void> changeInternally(
ConfigurationSource src,
ConfigurationStorage storage
) {
StorageRoots storageRoots = storagesRootsMap.get(storage.type());
return CompletableFuture
.supplyAsync(() -> {
SuperRoot curRoots = storageRoots.roots;
SuperRoot changes = curRoots.copy();
src.reset();
src.descend(changes);
addDefaults(changes);
Map<String, Serializable> allChanges = createFlattenedUpdatesMap(curRoots, changes);
// Unlikely but still possible.
if (allChanges.isEmpty())
return null;
dropNulls(changes);
List<ValidationIssue> validationIssues = ValidationUtil.validate(
curRoots,
changes,
this::getRootNode,
cachedAnnotations,
validators
);
if (!validationIssues.isEmpty())
throw new ConfigurationValidationException(validationIssues);
return allChanges;
}, pool)
.thenCompose(allChanges -> {
if (allChanges == null)
return completedFuture(null);
return storage.write(allChanges, storageRoots.version)
.thenCompose(casResult -> {
if (casResult)
return storageRoots.changeFuture;
else
return storageRoots.changeFuture.thenCompose(v -> changeInternally(src, storage));
})
.exceptionally(throwable -> {
throw new ConfigurationChangeException("Failed to change configuration", throwable);
});
});
}
/**
* Updates configuration from storage listener.
*
* @param storageType Type of the storage that propagated these changes.
* @param changedEntries Changed data.
* @return Future that signifies update completion.
*/
private CompletableFuture<Void> updateFromListener(
ConfigurationType storageType,
Data changedEntries
) {
StorageRoots oldStorageRoots = this.storagesRootsMap.get(storageType);
Map<String, ?> dataValuesPrefixMap = toPrefixMap(changedEntries.values());
compressDeletedEntries(dataValuesPrefixMap);
SuperRoot oldSuperRoot = oldStorageRoots.roots;
SuperRoot newSuperRoot = oldSuperRoot.copy();
fillFromPrefixMap(newSuperRoot, dataValuesPrefixMap);
long newChangeId = changedEntries.changeId();
StorageRoots newStorageRoots = new StorageRoots(newSuperRoot, newChangeId);
storagesRootsMap.put(storageType, newStorageRoots);
return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId)
.whenComplete((v, t) -> {
if (t == null)
oldStorageRoots.changeFuture.complete(null);
else
oldStorageRoots.changeFuture.completeExceptionally(t);
});
}
/**
* "Compress" prefix map - this means that deleted named list elements will be represented as a single {@code null}
* objects instead of a number of nullified configuration leaves.
*
* @param prefixMap Prefix map, constructed from the storage notification data or its subtree.
*/
private void compressDeletedEntries(Map<String, ?> prefixMap) {
// Here we basically assume that if prefix subtree contains single null child then all its childrens are nulls.
// Replace all such elements will nulls, signifying that these are deleted named list elements.
prefixMap.replaceAll((key, value) ->
value instanceof Map && ((Map<?, ?>)value).containsValue(null) ? null : value
);
// Continue recursively.
for (Object value : prefixMap.values()) {
if (value instanceof Map)
compressDeletedEntries((Map<String, ?>)value);
}
}
}