blob: e89de39836fa208b81bbf56f8e6d30ac97e8bf0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.configuration.distributed;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.ACTUALIZE;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.CLUSTER_WIDE_UPDATE;
import static org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor.AllowableAction.REGISTER;
/**
* Processor of distributed configuration.
*
* This class control lifecycle of actualization {@link DistributedProperty} across whole cluster.
*/
public class DistributedConfigurationProcessor extends GridProcessorAdapter implements DistributedPropertyDispatcher {
/** Prefix of key for distributed meta storage. */
private static final String DIST_CONF_PREFIX = "distrConf-";
/** Properties storage. */
private final Map<String, DistributedChangeableProperty> props = new ConcurrentHashMap<>();
/** Global metastorage. */
private volatile DistributedMetaStorage distributedMetastorage;
/** Max allowed action. All action with less ordinal than this also allowed. */
private volatile AllowableAction allowableAction = REGISTER;
/**
* @param ctx Kernal context.
*/
public DistributedConfigurationProcessor(GridKernalContext ctx) {
super(ctx);
}
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
GridInternalSubscriptionProcessor isp = ctx.internalSubscriptionProcessor();
isp.registerDistributedMetastorageListener(new DistributedMetastorageLifecycleListener() {
@Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
distributedMetastorage = ctx.distributedMetastorage();
//Listener for handling of cluster wide change of specific properties. Do local update.
distributedMetastorage.listen(
(key) -> key.startsWith(DIST_CONF_PREFIX),
(String key, Serializable oldVal, Serializable newVal) -> {
DistributedChangeableProperty prop = props.get(toPropertyKey(key));
if (prop != null)
prop.localUpdate(newVal);
}
);
//Switch to actualize action and actualize already registered properties.
switchCurrentActionTo(ACTUALIZE);
//Register and actualize properties waited for this service.
isp.getDistributedConfigurationListeners()
.forEach(listener -> listener.onReadyToRegister(DistributedConfigurationProcessor.this));
}
@Override public void onReadyForWrite(DistributedMetaStorage metastorage) {
//Switch to cluster wide update action and do it on already registered properties.
switchCurrentActionTo(CLUSTER_WIDE_UPDATE);
isp.getDistributedConfigurationListeners()
.forEach(DistributedConfigurationLifecycleListener::onReadyToWrite);
}
});
}
/**
* Switching current action to given action and do all actions from old action to new one.
*
* @param to New action for switching on.
*/
private synchronized void switchCurrentActionTo(AllowableAction to) {
AllowableAction oldAct = allowableAction;
assert oldAct.ordinal() <= to.ordinal() : "Current action : " + oldAct + ", new action : " + to;
allowableAction = to;
for (AllowableAction action : AllowableAction.values()) {
if (action.ordinal() > oldAct.ordinal())
props.values().forEach(prop -> doAction(action, prop));
if (action == to)
break;
}
}
/**
* @param propKey Key of specific property.
* @return Property key for meta storage.
*/
private static String toMetaStorageKey(String propKey) {
return DIST_CONF_PREFIX + propKey;
}
/**
* @param metaStorageKey Key from meta storage.
* @return Original property key.
*/
private static String toPropertyKey(String metaStorageKey) {
return metaStorageKey.substring(DIST_CONF_PREFIX.length());
}
/** {@inheritDoc} */
@Override public <T extends DistributedChangeableProperty> void registerProperties(T... props) {
Arrays.stream(props)
.forEach(this::registerProperty);
}
/**
* Register property to processor and attach it if it possible.
*
* @param prop Property to attach to processor.
*/
@Override public <T extends Serializable> DistributedProperty<T> registerProperty(
DistributedChangeableProperty<T> prop
) {
doAllAllowableActions(prop);
return prop;
}
/**
* Execute all allowable actions until current action on given property.
*
* @param prop Property which action should be executed on.
*/
private void doAllAllowableActions(DistributedChangeableProperty prop) {
for (AllowableAction action : AllowableAction.values()) {
doAction(action, prop);
if (action == allowableAction)
break;
}
}
/**
* Do one given action on given property.
*
* @param act Action to execute.
* @param prop Property which action should be execute on.
*/
private void doAction(AllowableAction act, DistributedChangeableProperty prop) {
switch (act) {
case REGISTER:
doRegister(prop);
break;
case ACTUALIZE:
doActualize(prop);
break;
case CLUSTER_WIDE_UPDATE:
doClusterWideUpdate(prop);
break;
}
}
/**
* Do register action on given property.
*
* Bind property with this processor for furthter actualizing.
*
* @param prop Property which action should be execute on.
*/
private void doRegister(DistributedChangeableProperty prop) {
if (props.containsKey(prop.getName()))
throw new IllegalArgumentException("Property already exists : " + prop.getName());
props.put(prop.getName(), prop);
prop.onAttached();
}
/**
* Do actualize action on given property.
*
* Read actual value from metastore and set it to local property.
*
* @param prop Property which action should be execute on.
*/
private void doActualize(DistributedChangeableProperty prop) {
Serializable readVal = null;
try {
readVal = distributedMetastorage.read(toMetaStorageKey(prop.getName()));
}
catch (IgniteCheckedException e) {
log.error("Can not read value of property '" + prop.getName() + "'", e);
}
prop.localUpdate(readVal);
}
/**
* Do cluster wide action on given property.
*
* Set closure for cluster wide update action to given property.
*
* @param prop Property which action should be execute on.
*/
private void doClusterWideUpdate(DistributedChangeableProperty prop) {
prop.onReadyForUpdate(new PropertyUpdateClosure() {
@Override public GridFutureAdapter<?> update(String key, Serializable newValue)
throws IgniteCheckedException {
return distributedMetastorage.writeAsync(toMetaStorageKey(key), newValue);
}
@Override public GridFutureAdapter<?> casUpdate(
String key,
Serializable expectedValue,
Serializable newValue
) throws IgniteCheckedException {
return distributedMetastorage.compareAndSetAsync(toMetaStorageKey(key), expectedValue, newValue);
}
});
}
/**
* This enum determinate what is action allowable for proccessor in current moment.
*
* Order is important. Each next action allowable all previous actions. Current action can be changed only from
* previous to next .
*/
enum AllowableAction {
/**
* Only registration allowed. Actualization property from metastore and cluster wide update aren't allowed.
*/
REGISTER,
/**
* Registration and actualization property from metastore are allowed. Cluster wide update isn't allowed.
*/
ACTUALIZE,
/**
* All of below are allowed.
*/
CLUSTER_WIDE_UPDATE;
}
}