blob: 332c94b11e4ddd96597cd517069ccd9fb4813d9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.configuration.util;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.configuration.ConfigurationNode;
import org.apache.ignite.internal.configuration.DynamicConfiguration;
import org.apache.ignite.internal.configuration.DynamicProperty;
import org.apache.ignite.internal.configuration.NamedListConfiguration;
import org.apache.ignite.internal.configuration.tree.ConfigurationVisitor;
import org.apache.ignite.internal.configuration.tree.InnerNode;
import org.apache.ignite.internal.configuration.tree.NamedListNode;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.innerNodeVisitor;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.leafNodeVisitor;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.namedListNodeVisitor;
/** */
public class ConfigurationNotificationsUtil {
/**
* Recursively notifies all public configuration listeners, accumulating resulting futures in {@code futures} list.
* @param oldInnerNode Old configuration values root.
* @param newInnerNode New configuration values root.
* @param cfgNode Public configuration tree node corresponding to the current inner nodes.
* @param storageRevision Storage revision.
* @param futures Write-only list of futures to accumulate results.
*/
public static void notifyListeners(
InnerNode oldInnerNode,
InnerNode newInnerNode,
DynamicConfiguration<InnerNode, ?> cfgNode,
long storageRevision,
List<CompletableFuture<?>> futures
) {
assert !(cfgNode instanceof NamedListConfiguration);
if (oldInnerNode == null || oldInnerNode == newInnerNode)
return;
notifyPublicListeners(
cfgNode.listeners(),
oldInnerNode,
newInnerNode,
storageRevision,
futures,
ConfigurationListener::onUpdate
);
Map<String, ConfigurationProperty<?, ?>> cfgNodeMembers = cfgNode.members();
oldInnerNode.traverseChildren(new ConfigurationVisitor<Void>() {
/** {@inheritDoc} */
@Override public Void visitLeafNode(String key, Serializable oldLeaf) {
Serializable newLeaf = newInnerNode.traverseChild(key, leafNodeVisitor(), true);
if (newLeaf != oldLeaf) {
var dynProperty = (DynamicProperty<Serializable>)cfgNodeMembers.get(key);
notifyPublicListeners(
dynProperty.listeners(),
oldLeaf,
newLeaf,
storageRevision,
futures,
ConfigurationListener::onUpdate
);
}
return null;
}
/** {@inheritDoc} */
@Override public Void visitInnerNode(String key, InnerNode oldNode) {
InnerNode newNode = newInnerNode.traverseChild(key, innerNodeVisitor(), true);
var dynCfg = (DynamicConfiguration<InnerNode, ?>)cfgNodeMembers.get(key);
notifyListeners(oldNode, newNode, dynCfg, storageRevision, futures);
return null;
}
/** {@inheritDoc} */
@Override public <N extends InnerNode> Void visitNamedListNode(String key, NamedListNode<N> oldNamedList) {
var newNamedList = (NamedListNode<InnerNode>)newInnerNode.traverseChild(key, namedListNodeVisitor(), true);
if (newNamedList != oldNamedList) {
var namedListCfg = (NamedListConfiguration<?, InnerNode, ?>)cfgNodeMembers.get(key);
notifyPublicListeners(
namedListCfg.listeners(),
(NamedListView<InnerNode>)oldNamedList,
newNamedList,
storageRevision,
futures,
ConfigurationListener::onUpdate
);
// This is optimization, we could use "NamedListConfiguration#get" directly, but we don't want to.
List<String> oldNames = oldNamedList.namedListKeys();
List<String> newNames = newNamedList.namedListKeys();
Map<String, ConfigurationProperty<?, ?>> namedListCfgMembers = namedListCfg.touchMembers();
Set<String> created = new HashSet<>(newNames);
created.removeAll(oldNames);
Set<String> deleted = new HashSet<>(oldNames);
deleted.removeAll(newNames);
Map<String, String> renamed = new HashMap<>();
if (!created.isEmpty() && !deleted.isEmpty()) {
Map<String, String> createdIds = new HashMap<>();
for (String createdKey : created)
createdIds.put(newNamedList.internalId(createdKey), createdKey);
// Avoiding ConcurrentModificationException.
for (String deletedKey : Set.copyOf(deleted)) {
String internalId = oldNamedList.internalId(deletedKey);
String maybeRenamedKey = createdIds.get(internalId);
if (maybeRenamedKey == null)
continue;
deleted.remove(deletedKey);
created.remove(maybeRenamedKey);
renamed.put(deletedKey, maybeRenamedKey);
}
}
List<ConfigurationNamedListListener<InnerNode>> extListeners = namedListCfg.extendedListeners();
for (String name : created) {
notifyPublicListeners(
extListeners,
null,
newNamedList.get(name),
storageRevision,
futures,
ConfigurationNamedListListener::onCreate
);
touch((DynamicConfiguration<?, ?>)namedListCfg.members().get(name));
}
for (String name : deleted) {
notifyPublicListeners(
extListeners,
oldNamedList.get(name),
null,
storageRevision,
futures,
ConfigurationNamedListListener::onDelete
);
var deletedProp = (ConfigurationNode<N, ?>)namedListCfgMembers.get(name);
notifyPublicListeners(
deletedProp.listeners(),
oldNamedList.get(name),
null,
storageRevision,
futures,
ConfigurationListener::onUpdate
);
}
for (Map.Entry<String, String> entry : renamed.entrySet()) {
notifyPublicListeners(
extListeners,
oldNamedList.get(entry.getKey()),
newNamedList.get(entry.getValue()),
storageRevision,
futures,
(listener, evt) -> listener.onRename(entry.getKey(), entry.getValue(), evt)
);
}
for (String name : newNames) {
if (!oldNames.contains(name))
continue;
notifyPublicListeners(
extListeners,
oldNamedList.get(name),
newNamedList.get(name),
storageRevision,
futures,
ConfigurationListener::onUpdate
);
var dynCfg = (DynamicConfiguration<InnerNode, ?>)namedListCfgMembers.get(name);
notifyListeners(oldNamedList.get(name), newNamedList.get(name), dynCfg, storageRevision, futures);
}
}
return null;
}
}, true);
}
/**
* Invoke {@link ConfigurationListener#onUpdate(ConfigurationNotificationEvent)} on all passed listeners and put
* results in {@code futures}. Not recursively.
*
* @param listeners List o flisteners.
* @param oldVal Old configuration value.
* @param newVal New configuration value.
* @param storageRevision Storage revision.
* @param futures Write-only list of futures.
* @param updater Update closure to be invoked on the listener instance.
* @param <V> Type of the node.
* @param <L> Type of the configuration listener.
*/
private static <V, L extends ConfigurationListener<V>> void notifyPublicListeners(
List<? extends L> listeners,
V oldVal,
V newVal,
long storageRevision,
List<CompletableFuture<?>> futures,
BiFunction<L, ConfigurationNotificationEvent<V>, CompletableFuture<?>> updater
) {
ConfigurationNotificationEvent<V> evt = new ConfigurationNotificationEventImpl<>(
oldVal,
newVal,
storageRevision
);
for (L listener : listeners) {
try {
CompletableFuture<?> future = updater.apply(listener, evt);
assert future != null : updater;
if (future.isCompletedExceptionally() || future.isCancelled() || !future.isDone())
futures.add(future);
}
catch (Throwable t) {
futures.add(CompletableFuture.failedFuture(t));
}
}
}
/**
* Ensures that dynamic configuration tree is up to date and further notifications on it will be invoked correctly.
*
* @param cfg Dynamic configuration node instance.
*/
public static void touch(DynamicConfiguration<?, ?> cfg) {
cfg.touchMembers();
for (ConfigurationProperty<?, ?> value : cfg.members().values()) {
if (value instanceof DynamicConfiguration)
touch((DynamicConfiguration<?, ?>)value);
}
}
}