blob: 57d8d3eacf8756f0501feb66910f15efb4135f43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.unomi.services.services;
import org.apache.karaf.cellar.config.ClusterConfigurationEvent;
import org.apache.karaf.cellar.config.Constants;
import org.apache.karaf.cellar.core.*;
import org.apache.karaf.cellar.core.control.SwitchStatus;
import org.apache.karaf.cellar.core.event.EventProducer;
import org.apache.karaf.cellar.core.event.EventType;
import org.apache.unomi.api.ClusterNode;
import org.apache.unomi.api.services.ClusterService;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.util.*;
/**
* Implementation of the persistence service interface
*/
public class ClusterServiceImpl implements ClusterService {
public static final String KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION = "org.apache.unomi.nodes";
public static final String KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS = "publicEndpoints";
public static final String KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS = "internalEndpoints";
private static final Logger logger = LoggerFactory.getLogger(ClusterServiceImpl.class.getName());
PersistenceService persistenceService;
private ClusterManager karafCellarClusterManager;
private EventProducer karafCellarEventProducer;
private GroupManager karafCellarGroupManager;
private String karafCellarGroupName = Configurations.DEFAULT_GROUP_NAME;
private ConfigurationAdmin osgiConfigurationAdmin;
private String karafJMXUsername = "karaf";
private String karafJMXPassword = "karaf";
private int karafJMXPort = 1099;
private String publicAddress;
private String internalAddress;
private Map<String, JMXConnector> jmxConnectors = new LinkedHashMap<>();
public void setPersistenceService(PersistenceService persistenceService) {
this.persistenceService = persistenceService;
}
public void setKarafCellarClusterManager(ClusterManager karafCellarClusterManager) {
this.karafCellarClusterManager = karafCellarClusterManager;
}
public void setKarafCellarEventProducer(EventProducer karafCellarEventProducer) {
this.karafCellarEventProducer = karafCellarEventProducer;
}
public void setKarafCellarGroupManager(GroupManager karafCellarGroupManager) {
this.karafCellarGroupManager = karafCellarGroupManager;
}
public void setKarafCellarGroupName(String karafCellarGroupName) {
this.karafCellarGroupName = karafCellarGroupName;
}
public void setOsgiConfigurationAdmin(ConfigurationAdmin osgiConfigurationAdmin) {
this.osgiConfigurationAdmin = osgiConfigurationAdmin;
}
public void setKarafJMXUsername(String karafJMXUsername) {
this.karafJMXUsername = karafJMXUsername;
}
public void setKarafJMXPassword(String karafJMXPassword) {
this.karafJMXPassword = karafJMXPassword;
}
public void setKarafJMXPort(int karafJMXPort) {
this.karafJMXPort = karafJMXPort;
}
public void setPublicAddress(String publicAddress) {
this.publicAddress = publicAddress;
}
public void setInternalAddress(String internalAddress) {
this.internalAddress = internalAddress;
}
public void init() {
if (karafCellarEventProducer != null && karafCellarClusterManager != null) {
boolean setupConfigOk = true;
Group group = karafCellarGroupManager.findGroupByName(karafCellarGroupName);
if (setupConfigOk && group == null) {
logger.error("Cluster group " + karafCellarGroupName + " doesn't exist, creating it...");
group = karafCellarGroupManager.createGroup(karafCellarGroupName);
if (group != null) {
setupConfigOk = true;
} else {
setupConfigOk = false;
}
}
// check if the producer is ON
if (setupConfigOk && karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
logger.error("Cluster event producer is OFF");
setupConfigOk = false;
}
// check if the config pid is allowed
if (setupConfigOk && !isClusterConfigPIDAllowed(group, Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, EventType.OUTBOUND)) {
logger.error("Configuration PID " + KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster group " + karafCellarGroupName);
setupConfigOk = false;
}
if (setupConfigOk) {
Map<String, Properties> configurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName);
org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode();
Properties karafCellarClusterNodeConfiguration = configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
if (karafCellarClusterNodeConfiguration == null) {
karafCellarClusterNodeConfiguration = new Properties();
}
Map<String, String> publicEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + publicAddress);
publicEndpoints.put(thisKarafNode.getId(), publicAddress);
setMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints);
Map<String, String> internalEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + internalAddress);
internalEndpoints.put(thisKarafNode.getId(), internalAddress);
setMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints);
configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, karafCellarClusterNodeConfiguration);
ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
clusterConfigurationEvent.setSourceGroup(group);
karafCellarEventProducer.produce(clusterConfigurationEvent);
}
}
logger.info("Cluster service initialized.");
}
public void destroy() {
for (Map.Entry<String, JMXConnector> jmxConnectorEntry : jmxConnectors.entrySet()) {
String url = jmxConnectorEntry.getKey();
JMXConnector jmxConnector = jmxConnectorEntry.getValue();
try {
jmxConnector.close();
} catch (IOException e) {
logger.error("Error closing JMX connector for url {}", url, e);
}
}
logger.info("Cluster service shutdown.");
}
@Override
public List<ClusterNode> getClusterNodes() {
Map<String, ClusterNode> clusterNodes = new LinkedHashMap<String, ClusterNode>();
Set<org.apache.karaf.cellar.core.Node> karafCellarNodes = karafCellarClusterManager.listNodes();
org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode();
Map<String, Properties> clusterConfigurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName);
Properties karafCellarClusterNodeConfiguration = clusterConfigurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
Map<String, String> publicNodeEndpoints = new TreeMap<>();
Map<String, String> internalNodeEndpoints = new TreeMap<>();
if (karafCellarClusterNodeConfiguration != null) {
publicNodeEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + publicAddress);
internalNodeEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + internalAddress);
}
for (org.apache.karaf.cellar.core.Node karafCellarNode : karafCellarNodes) {
ClusterNode clusterNode = new ClusterNode();
String publicEndpoint = publicNodeEndpoints.get(karafCellarNode.getId());
if (publicEndpoint != null) {
clusterNode.setPublicHostAddress(publicEndpoint);
}
String internalEndpoint = internalNodeEndpoints.get(karafCellarNode.getId());
if (internalEndpoint != null) {
clusterNode.setInternalHostAddress(internalEndpoint);
}
String serviceUrl = "service:jmx:rmi:///jndi/rmi://" + karafCellarNode.getHost() + ":" + karafJMXPort + "/karaf-root";
try {
JMXConnector jmxConnector = getJMXConnector(serviceUrl);
MBeanServerConnection mbsc = jmxConnector.getMBeanServerConnection();
final RuntimeMXBean remoteRuntime = ManagementFactory.newPlatformMXBeanProxy(mbsc, ManagementFactory.RUNTIME_MXBEAN_NAME, RuntimeMXBean.class);
clusterNode.setUptime(remoteRuntime.getUptime());
ObjectName operatingSystemMXBeanName = new ObjectName(ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME);
Double systemCpuLoad = null;
try {
systemCpuLoad = (Double) mbsc.getAttribute(operatingSystemMXBeanName, "SystemCpuLoad");
} catch (MBeanException e) {
logger.error("Error retrieving system CPU load", e);
} catch (AttributeNotFoundException e) {
logger.error("Error retrieving system CPU load", e);
}
final OperatingSystemMXBean remoteOperatingSystemMXBean = ManagementFactory.newPlatformMXBeanProxy(mbsc, ManagementFactory.OPERATING_SYSTEM_MXBEAN_NAME, OperatingSystemMXBean.class);
clusterNode.setLoadAverage(new double[]{remoteOperatingSystemMXBean.getSystemLoadAverage()});
if (systemCpuLoad != null) {
clusterNode.setCpuLoad(systemCpuLoad);
}
} catch (MalformedURLException e) {
logger.error("Error connecting to remote JMX server", e);
} catch (ConnectException ce) {
handleTimeouts(serviceUrl, ce);
} catch (java.rmi.ConnectException ce) {
handleTimeouts(serviceUrl, ce);
} catch (java.rmi.ConnectIOException cioe) {
handleTimeouts(serviceUrl, cioe);
} catch (IOException e) {
logger.error("Error retrieving remote JMX data", e);
} catch (MalformedObjectNameException e) {
logger.error("Error retrieving remote JMX data", e);
} catch (InstanceNotFoundException e) {
logger.error("Error retrieving remote JMX data", e);
} catch (ReflectionException e) {
logger.error("Error retrieving remote JMX data", e);
}
clusterNodes.put(karafCellarNode.getId(), clusterNode);
}
return new ArrayList<ClusterNode>(clusterNodes.values());
}
private void handleTimeouts(String serviceUrl, Throwable throwable) {
Throwable rootCause = throwable;
while (rootCause.getCause() != null) {
rootCause = rootCause.getCause();
}
logger.warn("JMX RMI Connection error, will reconnect on next request. Active debug logging for access to detailed stack trace. Root cause=" + rootCause.getMessage());
logger.debug("Detailed stacktrace", throwable);
JMXConnector jmxConnector = jmxConnectors.remove(serviceUrl);
try {
if (jmxConnector != null) {
jmxConnector.close();
}
} catch (Throwable t) {
// ignore any exception when closing a timed out connection.
}
}
@Override
public void purge(Date date) {
persistenceService.purge(date);
}
@Override
public void purge(String scope) {
persistenceService.purge(scope);
}
/**
* Check if a configuration is allowed.
*
* @param group the cluster group.
* @param category the configuration category constant.
* @param pid the configuration PID.
* @param type the cluster event type.
* @return true if the cluster event type is allowed, false else.
*/
public boolean isClusterConfigPIDAllowed(Group group, String category, String pid, EventType type) {
CellarSupport support = new CellarSupport();
support.setClusterManager(this.karafCellarClusterManager);
support.setGroupManager(this.karafCellarGroupManager);
support.setConfigurationAdmin(this.osgiConfigurationAdmin);
return support.isAllowed(group, category, pid, type);
}
private JMXConnector getJMXConnector(String url) throws IOException {
if (jmxConnectors.containsKey(url)) {
JMXConnector jmxConnector = jmxConnectors.get(url);
try {
jmxConnector.getMBeanServerConnection();
return jmxConnector;
} catch (IOException e) {
jmxConnectors.remove(url);
try {
jmxConnector.close();
} catch (IOException e1) {
logger.warn("Closing invalid JMX connection resulted in :" + e1.getMessage() + ", this is probably ok.");
logger.debug("Error closing invalid JMX connection", e1);
}
if (e.getMessage() != null && e.getMessage().contains("Connection closed")) {
logger.warn("JMX connection to url {} was closed (Cause:{}). Reconnecting...", url, e.getMessage());
} else {
logger.error("Error using the JMX connection to url {}, closed and will reconnect", url, e);
}
}
}
// if we reach this point either we didn't have a connector or it didn't validate
// now let's connect to remote JMX service to retrieve information from the runtime and operating system MX beans
JMXServiceURL jmxServiceURL = new JMXServiceURL(url);
Map<String, Object> environment = new HashMap<String, Object>();
if (karafJMXUsername != null && karafJMXPassword != null) {
environment.put(JMXConnector.CREDENTIALS, new String[]{karafJMXUsername, karafJMXPassword});
}
JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxServiceURL, environment);
jmxConnectors.put(url, jmxConnector);
return jmxConnector;
}
private Map<String, String> getMapProperty(Properties properties, String propertyName, String defaultValue) {
String propertyValue = properties.getProperty(propertyName, defaultValue);
return getMapProperty(propertyValue);
}
private Map<String, String> getMapProperty(String propertyValue) {
String[] propertyValueArray = propertyValue.split(",");
Map<String, String> propertyMapValue = new LinkedHashMap<>();
for (String propertyValueElement : propertyValueArray) {
String[] propertyValueElementPrats = propertyValueElement.split("=");
propertyMapValue.put(propertyValueElementPrats[0], propertyValueElementPrats[1]);
}
return propertyMapValue;
}
private Map<String, String> setMapProperty(Properties properties, String propertyName, Map<String, String> propertyMapValue) {
StringBuilder propertyValueBuilder = new StringBuilder();
int entryCount = 0;
for (Map.Entry<String, String> propertyMapValueEntry : propertyMapValue.entrySet()) {
propertyValueBuilder.append(propertyMapValueEntry.getKey());
propertyValueBuilder.append("=");
propertyValueBuilder.append(propertyMapValueEntry.getValue());
if (entryCount < propertyMapValue.size() - 1) {
propertyValueBuilder.append(",");
}
}
String oldPropertyValue = (String) properties.setProperty(propertyName, propertyValueBuilder.toString());
if (oldPropertyValue == null) {
return null;
}
return getMapProperty(oldPropertyValue);
}
}