blob: 44a5a7a677f032e887a691c571888c5738365bba [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.karaf.cellar.config;
import org.apache.karaf.cellar.core.Configurations;
import org.apache.karaf.cellar.core.Group;
import org.apache.karaf.cellar.core.Synchronizer;
import org.apache.karaf.cellar.core.control.SwitchStatus;
import org.apache.karaf.cellar.core.event.EventProducer;
import org.apache.karaf.cellar.core.event.EventType;
import org.apache.karaf.features.BootFinished;
import org.osgi.framework.BundleContext;
import org.osgi.framework.InvalidSyntaxException;
import org.osgi.service.cm.Configuration;
import org.osgi.util.tracker.ServiceTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
/**
* The ConfigurationSynchronizer is called when Cellar starts or when a node joins a cluster group.
* The purpose is to synchronize local configurations with the configurations in the cluster groups.
*/
public class ConfigurationSynchronizer extends ConfigurationSupport implements Synchronizer {
private static final transient Logger LOGGER = LoggerFactory.getLogger(ConfigurationSynchronizer.class);
private EventProducer eventProducer;
public void setEventProducer(EventProducer eventProducer) {
this.eventProducer = eventProducer;
}
public void init(BundleContext bundleContext) {
// wait the end of Karaf boot process
ServiceTracker tracker = new ServiceTracker(bundleContext, BootFinished.class, null);
try {
tracker.waitForService(120000);
} catch (Exception e) {
LOGGER.warn("Can't start BootFinished service tracker", e);
}
if (groupManager == null)
return;
Set<Group> groups = groupManager.listLocalGroups();
if (groups != null && !groups.isEmpty()) {
for (Group group : groups) {
sync(group);
}
}
}
public void destroy() {
// nothing to do
}
/**
* Sync node and cluster states, depending of the sync policy.
*
* @param group the target cluster group.
*/
@Override
public void sync(Group group) {
String policy = getSyncPolicy(group);
if (policy == null) {
LOGGER.warn("CELLAR CONFIG: sync policy is not defined for cluster group {}", group.getName());
}
if (policy.equalsIgnoreCase("cluster")) {
LOGGER.debug("CELLAR CONFIG: sync policy set as 'cluster' for cluster group {}", group.getName());
LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull first)");
pull(group);
LOGGER.debug("CELLAR CONFIG: node is the first one in the cluster group, no pull");
LOGGER.debug("CELLAR CONFIG: updating cluster from the local node (push after)");
push(group);
} else if (policy.equalsIgnoreCase("node")) {
LOGGER.debug("CELLAR CONFIG: sync policy set as 'node' for cluster group {}", group.getName());
LOGGER.debug("CELLAR CONFIG: updating cluster from the local node (push first)");
push(group);
LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull after)");
pull(group);
} else if (policy.equalsIgnoreCase("clusterOnly")) {
LOGGER.debug("CELLAR CONFIG: sync policy set as 'clusterOnly' for cluster group " + group.getName());
LOGGER.debug("CELLAR CONFIG: updating node from the cluster (pull only)");
pull(group);
LOGGER.debug("CELLAR CONFIG: node is the first one in the cluster group, no pull");
} else if (policy.equalsIgnoreCase("nodeOnly")) {
LOGGER.debug("CELLAR CONFIG: sync policy set as 'nodeOnly' for cluster group " + group.getName());
LOGGER.debug("CELLAR CONFIG: updating cluster from the local node (push only)");
push(group);
} else {
LOGGER.debug("CELLAR CONFIG: sync policy set as 'disabled' for cluster group " + group.getName());
LOGGER.debug("CELLAR CONFIG: no sync");
}
}
/**
* Pull the configuration from a cluster group to update the local ones.
*
* @param group the cluster group where to get the configurations.
*/
public void pull(Group group) {
if (group != null) {
String groupName = group.getName();
LOGGER.debug("CELLAR CONFIG: pulling configurations from cluster group {}", groupName);
Map<String, Properties> clusterConfigurations = clusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + groupName);
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// get configurations on the cluster to update local configurations
for (String pid : clusterConfigurations.keySet()) {
if (isAllowed(group, Constants.CATEGORY, pid, EventType.INBOUND) && shouldReplicateConfig(clusterConfigurations.get(pid))) {
Dictionary clusterDictionary = clusterConfigurations.get(pid);
try {
// update the local configuration if needed
Configuration localConfiguration = findLocalConfiguration(pid, clusterDictionary);
if (localConfiguration == null) {
// Create new configuration
localConfiguration = createLocalConfiguration(pid, clusterDictionary);
}
Dictionary localDictionary = localConfiguration.getProperties();
if (localDictionary == null)
localDictionary = new Properties();
localDictionary = filter(localDictionary);
if (!equals(clusterDictionary, localDictionary) && canDistributeConfig(localDictionary) && shouldReplicateConfig(clusterDictionary)) {
LOGGER.debug("CELLAR CONFIG: updating configration {} on node", pid);
clusterDictionary = convertPropertiesFromCluster(clusterDictionary);
localConfiguration.update((Dictionary) clusterDictionary);
persistConfiguration(localConfiguration, clusterDictionary);
}
} catch (IOException ex) {
LOGGER.error("CELLAR CONFIG: failed to read local configuration", ex);
}
} else LOGGER.trace("CELLAR CONFIG: configuration with PID {} is marked BLOCKED INBOUND for cluster group {}", pid, groupName);
}
// cleanup the local configurations not present on the cluster if the node is not the first one in the cluster
if (clusterManager.listNodesByGroup(group).size() > 1) {
try {
Set<String> filenames = new HashSet();
for (Properties configuration : clusterConfigurations.values()) {
if (shouldReplicateConfig(configuration)) {
filenames.add(getKarafFilename(configuration));
}
}
filenames.remove(null);
for (Configuration configuration : configurationAdmin.listConfigurations(null)) {
String pid = configuration.getPid();
if (!clusterConfigurations.containsKey(pid) && !filenames.contains(getKarafFilename(configuration.getProperties())) && isAllowed(group, Constants.CATEGORY, pid, EventType.INBOUND)) {
LOGGER.debug("CELLAR CONFIG: deleting local configuration {} which is not present in cluster", pid);
deleteConfiguration(configuration);
}
}
} catch (Exception e) {
LOGGER.warn("Can't get local configurations", e);
}
}
} catch (Exception ex) {
LOGGER.error("CELLAR CONFIG: failed to read cluster configuration", ex);
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
}
/**
* Push local configurations to a cluster group.
*
* @param group the cluster group where to update the configurations.
*/
public void push(Group group) {
if (eventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
LOGGER.warn("CELLAR CONFIG: cluster event producer is OFF");
return;
}
if (group != null) {
String groupName = group.getName();
LOGGER.debug("CELLAR CONFIG: pushing configurations to cluster group {}", groupName);
Map<String, Properties> clusterConfigurations = clusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + groupName);
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Configuration[] localConfigurations;
try {
localConfigurations = configurationAdmin.listConfigurations(null);
// push local configurations to the cluster
for (Configuration localConfiguration : localConfigurations) {
String pid = localConfiguration.getPid();
// check if the pid is marked as local.
if (isAllowed(group, Constants.CATEGORY, pid, EventType.OUTBOUND)) {
Dictionary localDictionary = localConfiguration.getProperties();
localDictionary = filter(localDictionary);
if (!clusterConfigurations.containsKey(pid)) {
LOGGER.debug("CELLAR CONFIG: creating configuration pid {} on the cluster", pid);
// update cluster configurations
clusterConfigurations.put(pid, dictionaryToProperties(localDictionary));
// send cluster event
ClusterConfigurationEvent event = new ClusterConfigurationEvent(pid);
event.setSourceGroup(group);
event.setSourceNode(clusterManager.getNode());
event.setLocal(clusterManager.getNode());
eventProducer.produce(event);
} else {
Dictionary clusterDictionary = clusterConfigurations.get(pid);
if (!equals(clusterDictionary, localDictionary) && canDistributeConfig(localDictionary)) {
LOGGER.debug("CELLAR CONFIG: updating configuration pid {} on the cluster", pid);
// update cluster configurations
clusterConfigurations.put(pid, dictionaryToProperties(localDictionary));
// send cluster event
ClusterConfigurationEvent event = new ClusterConfigurationEvent(pid);
event.setSourceGroup(group);
event.setLocal(clusterManager.getNode());
event.setSourceNode(clusterManager.getNode());
eventProducer.produce(event);
}
}
} else
LOGGER.trace("CELLAR CONFIG: configuration with PID {} is marked BLOCKED OUTBOUND for cluster group {}", pid, groupName);
}
// clean configurations on the cluster not present locally
for (String pid : clusterConfigurations.keySet()) {
if (isAllowed(group, Constants.CATEGORY, pid, EventType.OUTBOUND)) {
boolean found = false;
for (Configuration configuration : configurationAdmin.listConfigurations(null)) {
if (configuration.getPid().equals(pid)) {
found = true;
break;
}
}
if (!found) {
clusterConfigurations.remove(pid);
}
}
}
} catch (IOException ex) {
LOGGER.error("CELLAR CONFIG: failed to read configuration (IO error)", ex);
} catch (InvalidSyntaxException ex) {
LOGGER.error("CELLAR CONFIG: failed to read configuration (invalid filter syntax)", ex);
}
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
}
/**
* Get the configuration sync policy for the given cluster group.
*
* @param group the cluster group.
* @return the current configuration sync policy for the given cluster group.
*/
@Override
public String getSyncPolicy(Group group) {
String groupName = group.getName();
try {
Configuration configuration = configurationAdmin.getConfiguration(Configurations.GROUP, null);
Dictionary<String, Object> properties = configuration.getProperties();
if (properties != null) {
String propertyKey = groupName + Configurations.SEPARATOR + Constants.CATEGORY + Configurations.SEPARATOR + Configurations.SYNC;
if (properties.get(propertyKey) != null) {
return properties.get(propertyKey).toString();
}
}
} catch (IOException e) {
LOGGER.error("CELLAR CONFIG: error while retrieving the sync policy", e);
}
return "cluster";
}
}