blob: 284b250d9c0318170b3cb6dec476459ed4f703ba [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.unomi.services.impl.cluster;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
import org.apache.karaf.cellar.config.Constants;
import org.apache.karaf.cellar.core.*;
import org.apache.karaf.cellar.core.control.SwitchStatus;
import org.apache.karaf.cellar.core.event.Event;
import org.apache.karaf.cellar.core.event.EventProducer;
import org.apache.karaf.cellar.core.event.EventType;
import org.apache.unomi.api.ClusterNode;
import org.apache.unomi.api.services.ClusterService;
import org.apache.unomi.api.services.SchedulerService;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.*;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* Implementation of the persistence service interface
*/
public class ClusterServiceImpl implements ClusterService {
public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION = "org.apache.unomi.nodes";
public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS = "publicEndpoints";
public static final String KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS = "internalEndpoints";
private static final Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
PersistenceService persistenceService;
private ClusterManager karafCellarClusterManager;
private EventProducer karafCellarEventProducer;
private GroupManager karafCellarGroupManager;
private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME;
private ConfigurationAdmin osgiConfigurationAdmin;
private String publicAddress;
private String internalAddress;
private Map<String, Map<String,Serializable>> nodeSystemStatistics = new ConcurrentHashMap<>();
private Group group = null;
private SchedulerService schedulerService;
private long nodeStatisticsUpdateFrequency = 10000;
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}
public void setKarafCellarClusterManager(ClusterManager karafCellarClusterManager) {
this.karafCellarClusterManager = karafCellarClusterManager;
}
public void setKarafCellarEventProducer(EventProducer karafCellarEventProducer) {
this.karafCellarEventProducer = karafCellarEventProducer;
}
public void setKarafCellarGroupManager(GroupManager karafCellarGroupManager) {
this.karafCellarGroupManager = karafCellarGroupManager;
}
public void setKarafCellarGroupName(String karafCellarGroupName) {
this.karafCellarGroupName = karafCellarGroupName;
}
public void setOsgiConfigurationAdmin(ConfigurationAdmin osgiConfigurationAdmin) {
this.osgiConfigurationAdmin = osgiConfigurationAdmin;
}
public void setPublicAddress(String publicAddress) {
this.publicAddress = publicAddress;
}
public void setInternalAddress(String internalAddress) {
this.internalAddress = internalAddress;
}
public void setNodeStatisticsUpdateFrequency(long nodeStatisticsUpdateFrequency) {
this.nodeStatisticsUpdateFrequency = nodeStatisticsUpdateFrequency;
}
public void setSchedulerService(SchedulerService schedulerService) {
this.schedulerService = schedulerService;
}
public Map<String, Map<String, Serializable>> getNodeSystemStatistics() {
return nodeSystemStatistics;
}
public void init() {
if (karafCellarEventProducer != null && karafCellarClusterManager != null) {
boolean setupConfigOk = true;
group = karafCellarGroupManager.findGroupByName(karafCellarGroupName);
if (setupConfigOk && group == null) {
logger.error("Cluster group " + karafCellarGroupName + " doesn't exist, creating it...");
group = karafCellarGroupManager.createGroup(karafCellarGroupName);
if (group != null) {
setupConfigOk = true;
} else {
setupConfigOk = false;
}
}
// check if the producer is ON
if (setupConfigOk && karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
logger.error("Cluster event producer is OFF");
setupConfigOk = false;
}
// check if the config pid is allowed
if (setupConfigOk && !isClusterConfigPIDAllowed(group, Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, EventType.OUTBOUND)) {
logger.error("Configuration PID " + KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster group " + karafCellarGroupName);
setupConfigOk = false;
}
if (setupConfigOk) {
Map<String, Properties> configurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName);
org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode();
Properties karafCellarClusterNodeConfiguration = configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
if (karafCellarClusterNodeConfiguration == null) {
karafCellarClusterNodeConfiguration = new Properties();
}
Map<String, String> publicEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + publicAddress);
publicEndpoints.put(thisKarafNode.getId(), publicAddress);
setMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints);
Map<String, String> internalEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + internalAddress);
internalEndpoints.put(thisKarafNode.getId(), internalAddress);
setMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints);
configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, karafCellarClusterNodeConfiguration);
ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
sendEvent(clusterConfigurationEvent);
}
TimerTask statisticsTask = new TimerTask() {
@Override
public void run() {
try {
updateSystemStats();
} catch (Throwable t) {
logger.error("Error updating system statistics", t);
}
}
};
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask, 0, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
}
logger.info("Cluster service initialized.");
}
public void destroy() {
logger.info("Cluster service shutdown.");
}
@Override
public List<ClusterNode> getClusterNodes() {
Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, ClusterNode>();
Set<org.apache.karaf.cellar.core.Node> karafCellarNodes = karafCellarClusterManager.listNodes();
org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode();
Map<String, Properties> clusterConfigurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName);
Properties karafCellarClusterNodeConfiguration = clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
Map<String, String> publicNodeEndpoints = new TreeMap<>();
Map<String, String> internalNodeEndpoints = new TreeMap<>();
if (karafCellarClusterNodeConfiguration != null) {
publicNodeEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + publicAddress);
internalNodeEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + internalAddress);
}
for (org.apache.karaf.cellar.core.Node karafCellarNode : karafCellarNodes) {
ClusterNode clusterNode = new ClusterNode();
String publicEndpoint = publicNodeEndpoints.get(karafCellarNode.getId());
if (publicEndpoint != null) {
clusterNode.setPublicHostAddress(publicEndpoint);
}
String internalEndpoint = internalNodeEndpoints.get(karafCellarNode.getId());
if (internalEndpoint != null) {
clusterNode.setInternalHostAddress(internalEndpoint);
}
Map<String,Serializable> nodeStatistics = nodeSystemStatistics.get(karafCellarNode.getId());
if (nodeStatistics != null) {
Long uptime = (Long) nodeStatistics.get("uptime");
if (uptime != null) {
clusterNode.setUptime(uptime);
}
Double systemCpuLoad = (Double) nodeStatistics.get("systemCpuLoad");
if (systemCpuLoad != null) {
clusterNode.setCpuLoad(systemCpuLoad);
}
List<Double> loadAverage = (List<Double>) nodeStatistics.get("systemLoadAverage");
if (loadAverage != null) {
Double[] loadAverageArray = loadAverage.toArray(new Double[loadAverage.size()]);
ArrayUtils.toPrimitive(loadAverageArray);
clusterNode.setLoadAverage(ArrayUtils.toPrimitive(loadAverageArray));
}
}
clusterNodes.put(karafCellarNode.getId(), clusterNode);
}
return new ArrayList<ClusterNode>(clusterNodes.values());
}
@Override
public void purge(Date date) {
persistenceService.purge(date);
}
@Override
public void purge(String scope) {
persistenceService.purge(scope);
}
@Override
public void sendEvent(Serializable eventObject) {
Event event = (Event) eventObject;
event.setSourceGroup(group);
event.setSourceNode(karafCellarClusterManager.getNode());
karafCellarEventProducer.produce(event);
}
/**
* Check if a configuration is allowed.
*
* @param group the cluster group.
* @param category the configuration category constant.
* @param pid the configuration PID.
* @param type the cluster event type.
* @return true if the cluster event type is allowed, false else.
*/
public boolean isClusterConfigPIDAllowed(Group group, String category, String pid, EventType type) {
CellarSupport support = new CellarSupport();
support.setClusterManager(this.karafCellarClusterManager);
support.setGroupManager(this.karafCellarGroupManager);
support.setConfigurationAdmin(this.osgiConfigurationAdmin);
return support.isAllowed(group, category, pid, type);
}
private Map<String, String> getMapProperty(Properties properties, String propertyName, String defaultValue) {
String propertyValue = properties.getProperty(propertyName, defaultValue);
return getMapProperty(propertyValue);
}
private Map<String, String> getMapProperty(String propertyValue) {
String[] propertyValueArray = propertyValue.split(",");
Map<String, String> propertyMapValue = new LinkedHashMap<>();
for (String propertyValueElement : propertyValueArray) {
String[] propertyValueElementPrats = propertyValueElement.split("=");
propertyMapValue.put(propertyValueElementPrats[0], propertyValueElementPrats[1]);
}
return propertyMapValue;
}
private Map<String, String> setMapProperty(Properties properties, String propertyName, Map<String, String> propertyMapValue) {
StringBuilder propertyValueBuilder = new StringBuilder();
int entryCount = 0;
for (Map.Entry<String, String> propertyMapValueEntry : propertyMapValue.entrySet()) {
propertyValueBuilder.append(propertyMapValueEntry.getKey());
propertyValueBuilder.append("=");
propertyValueBuilder.append(propertyMapValueEntry.getValue());
if (entryCount < propertyMapValue.size() - 1) {
propertyValueBuilder.append(",");
}
}
String oldPropertyValue = (String) properties.setProperty(propertyName, propertyValueBuilder.toString());
if (oldPropertyValue == null) {
return null;
}
return getMapProperty(oldPropertyValue);
}
private void updateSystemStats() {
final RuntimeMXBean remoteRuntime = ManagementFactory.getRuntimeMXBean();
long uptime = remoteRuntime.getUptime();
ObjectName operatingSystemMXBeanName = ManagementFactory.getOperatingSystemMXBean().getObjectName();
Double systemCpuLoad = null;
try {
systemCpuLoad = (Double) ManagementFactory.getPlatformMBeanServer().getAttribute(operatingSystemMXBeanName, "SystemCpuLoad");
} catch (MBeanException e) {
logger.error("Error retrieving system CPU load", e);
} catch (AttributeNotFoundException e) {
logger.error("Error retrieving system CPU load", e);
} catch (InstanceNotFoundException e) {
logger.error("Error retrieving system CPU load", e);
} catch (ReflectionException e) {
logger.error("Error retrieving system CPU load", e);
}
final OperatingSystemMXBean operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
double systemLoadAverage = operatingSystemMXBean.getSystemLoadAverage();
ClusterSystemStatisticsEvent clusterSystemStatisticsEvent = new ClusterSystemStatisticsEvent("org.apache.unomi.cluster.system.statistics");
Map<String,Serializable> systemStatistics = new TreeMap<>();
ArrayList<Double> systemLoadAverageArray = new ArrayList<>();
systemLoadAverageArray.add(systemLoadAverage);
systemStatistics.put("systemLoadAverage", systemLoadAverageArray);
systemStatistics.put("systemCpuLoad", systemCpuLoad);
systemStatistics.put("uptime", uptime);
clusterSystemStatisticsEvent.setStatistics(systemStatistics);
nodeSystemStatistics.put(karafCellarClusterManager.getNode().getId(), systemStatistics);
sendEvent(clusterSystemStatisticsEvent);
}
}