[KARAF-3613] Switch to EventAdmin
diff --git a/README b/README
index f5a18e6..c7fc4ac 100644
--- a/README
+++ b/README
@@ -52,14 +52,14 @@
You have to register the Decater features repository:
-karaf@root()> feature:repo-add mvn:org.apache.karaf.decanter/decanter/3.0.0-SNAPSHOT/xml/features
+karaf@root()> feature:repo-add mvn:org.apache.karaf.decanter/apache-karaf-decanter/3.0.0-SNAPSHOT/xml/features
It's up to you to choose the features to install, depending of the systems that you want:
* decanter-simple-scheduler
This feature installs a very simple Decanter Scheduler using a Thread.
-karaf@root()> feature:install decanter-simple-schedular
+karaf@root()> feature:install decanter-simple-scheduler
* decanter-collector-log
This feature installs a Decanter Collector listening for all log messages happening in Karaf.
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/Appender.java b/api/src/main/java/org/apache/karaf/decanter/api/Appender.java
deleted file mode 100644
index 37c8dd5..0000000
--- a/api/src/main/java/org/apache/karaf/decanter/api/Appender.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.api;
-
-import java.util.Map;
-
-/**
- * Append collected data to a target system
- */
-public interface Appender {
-
- /**
- * Append data to a target system.
- *
- * @param data the data to dispatch.
- * @throws Exception in case of appending failure.
- */
- public void append(Map<Long, Map<String, Object>> data) throws Exception;
-
-}
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/Collector.java b/api/src/main/java/org/apache/karaf/decanter/api/Collector.java
deleted file mode 100644
index 0a58ab4..0000000
--- a/api/src/main/java/org/apache/karaf/decanter/api/Collector.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.api;
-
-/**
- * Generic decanter collector (can be event driven or polling collector)/
- */
-public interface Collector {
-}
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/Dispatcher.java b/api/src/main/java/org/apache/karaf/decanter/api/Dispatcher.java
deleted file mode 100644
index ae822b8..0000000
--- a/api/src/main/java/org/apache/karaf/decanter/api/Dispatcher.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.api;
-
-import java.util.Map;
-
-/**
- * Describe the dispatcher service responsible to calling all appender services available.
- */
-public interface Dispatcher {
-
- /**
- * Call all appender services available to dispatch collected data.
- *
- * @param data the collected data to dispatch.
- * @throws Exception in case of appending failure.
- */
- public void dispatch(Map<Long, Map<String, Object>> data) throws Exception;
-
-}
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/PollingCollector.java b/api/src/main/java/org/apache/karaf/decanter/api/PollingCollector.java
deleted file mode 100644
index 9bfa657..0000000
--- a/api/src/main/java/org/apache/karaf/decanter/api/PollingCollector.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.api;
-
-import java.util.Map;
-
-/**
- * Interface describing a Decanter polling collector service (for instance log messages, JMX metrics, etc)
- */
-public interface PollingCollector extends Collector {
-
- /**
- * Collect data to send to the appender.
- *
- * @return the list of collected data.
- * @throws Exception in case of collection failure.
- */
- public Map<Long, Map<String, Object>> collect() throws Exception;
-
-}
diff --git a/appender/elasticsearch/pom.xml b/appender/elasticsearch/pom.xml
index 09291dc..9cf873f 100644
--- a/appender/elasticsearch/pom.xml
+++ b/appender/elasticsearch/pom.xml
@@ -29,12 +29,6 @@
<dependencies>
- <!-- Decanter -->
- <dependency>
- <groupId>org.apache.karaf.decanter</groupId>
- <artifactId>org.apache.karaf.decanter.api</artifactId>
- </dependency>
-
<!-- Elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
@@ -47,6 +41,11 @@
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
+
<!-- JSON builder -->
<dependency>
diff --git a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
index f43d137..553d778 100644
--- a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
+++ b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/Activator.java
@@ -19,9 +19,10 @@
import java.util.Dictionary;
import java.util.Hashtable;
-import org.apache.karaf.decanter.api.Appender;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
public class Activator implements BundleActivator {
@@ -32,12 +33,12 @@
appender = new ElasticsearchAppender("localhost", 9300);
appender.open();
Dictionary<String, String> properties = new Hashtable<>();
- properties.put("name", "elasticsearch");
- bundleContext.registerService(Appender.class, appender, properties);
+ properties.put(EventConstants.EVENT_TOPIC, "decanter/*");
+ bundleContext.registerService(EventHandler.class, appender, properties);
}
public void stop(BundleContext bundleContext) {
- appender.close();;
+ appender.close();
}
}
diff --git a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
index ab1d791..ce9f517 100644
--- a/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
+++ b/appender/elasticsearch/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
@@ -20,6 +20,7 @@
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TimeZone;
@@ -28,18 +29,19 @@
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
-import org.apache.karaf.decanter.api.Appender;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Karaf Decanter appender which inserts into Elasticsearch
*/
-public class ElasticsearchAppender implements Appender {
+public class ElasticsearchAppender implements EventHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class);
@@ -75,27 +77,29 @@
client.close();
}
- public void append(Map<Long, Map<String, Object>> data) throws Exception {
+ @Override
+ public void handleEvent(Event event) {
try {
- for (Entry<Long, Map<String, Object>> entry : data.entrySet()) {
- send(client, new Date(entry.getKey()), entry.getValue());
- }
+ send(client, event);
} catch (Exception e) {
LOGGER.warn("Can't append into Elasticsearch", e);
}
}
- private void send(Client client, Date date, Map<String, Object> props) {
- props.put("@timestamp", tsFormat.format(date));
+ @SuppressWarnings("unchecked")
+ private void send(Client client, Event event) {
+ Map<String, Object> props = new HashMap<>();
+ Long ts = (Long)event.getProperty("timestamp");
+ Date date = ts != null ? new Date((Long)ts) : new Date();
+
JsonObjectBuilder jsonObjectBuilder = Json.createObjectBuilder();
- for (Entry<String, Object> valueEntry : props.entrySet()) {
- Object value = valueEntry.getValue();
+ jsonObjectBuilder.add("@timestamp", tsFormat.format(date));
+ for (String key : event.getPropertyNames()) {
+ Object value = event.getProperty(key);
if (value instanceof String) {
- jsonObjectBuilder.add(valueEntry.getKey(), (String)value);
+ jsonObjectBuilder.add(key, (String) value);
} else if (value instanceof Map) {
- @SuppressWarnings("unchecked")
- JsonObject jsonO = asJson(jsonObjectBuilder, (Map<String, Object>)value);
- jsonObjectBuilder.add(valueEntry.getKey(), jsonO);
+ jsonObjectBuilder.add(key, build((Map<String, Object>) value));
}
}
JsonObject jsonObject = jsonObjectBuilder.build();
@@ -103,25 +107,27 @@
client.prepareIndex(indexName, "karaf_event").setSource(jsonObject.toString()).execute().actionGet();
}
- private String getIndexName(String prefix, Date date) {
- return prefix + "-" + indexDateFormat.format(date);
- }
-
- private JsonObject asJson(JsonObjectBuilder jsonObjectBuilder, Map<String, Object> value) {
+ private JsonObject build(Map<String, Object> value) {
JsonObjectBuilder innerBuilder = Json.createObjectBuilder();
for (Entry<String, Object> innerEntrySet : value.entrySet()) {
- String key = innerEntrySet.getKey();
- Object object = innerEntrySet.getValue();
- if (object instanceof String)
- innerBuilder.add(key, (String)object);
- else if (object instanceof Long)
- innerBuilder.add(key, (Long)object);
- else if (object instanceof Integer)
- innerBuilder.add(key, (Integer)object);
- else if (object instanceof Float)
- innerBuilder.add(key, (Float)object);
+ addProperty(innerBuilder, innerEntrySet.getKey(), innerEntrySet.getValue());
}
return innerBuilder.build();
}
+ private void addProperty(JsonObjectBuilder innerBuilder, String innerKey, Object innerValue) {
+ if (innerValue instanceof String)
+ innerBuilder.add(innerKey, (String) innerValue);
+ else if (innerValue instanceof Long)
+ innerBuilder.add(innerKey, (Long) innerValue);
+ else if (innerValue instanceof Integer)
+ innerBuilder.add(innerKey, (Integer) innerValue);
+ else if (innerValue instanceof Float)
+ innerBuilder.add(innerKey, (Float) innerValue);
+ }
+
+ private String getIndexName(String prefix, Date date) {
+ return prefix + "-" + indexDateFormat.format(date);
+ }
+
}
diff --git a/appender/log/pom.xml b/appender/log/pom.xml
index 2e93e11..788555f 100644
--- a/appender/log/pom.xml
+++ b/appender/log/pom.xml
@@ -35,17 +35,15 @@
<dependencies>
- <!-- Decanter API -->
- <dependency>
- <groupId>org.apache.karaf.decanter</groupId>
- <artifactId>org.apache.karaf.decanter.api</artifactId>
- </dependency>
-
<!-- OSGi -->
<dependency>
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
<!-- SLF4J -->
<dependency>
diff --git a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java
index c244d84..789ce8c 100644
--- a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java
+++ b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/Activator.java
@@ -16,29 +16,26 @@
*/
package org.apache.karaf.decanter.appender.log;
-import org.apache.karaf.decanter.api.Appender;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Properties;
+
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
-
-import java.util.Dictionary;
-import java.util.Properties;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
public class Activator implements BundleActivator {
- private ServiceRegistration service;
-
public void start(BundleContext bundleContext) {
- Appender appender = new LogAppender();
- Properties properties = new Properties();
- properties.put("name", "log");
- service = bundleContext.registerService(Appender.class, appender, (Dictionary) properties);
+ LogAppender appender = new LogAppender();
+ Dictionary<String, String> properties = new Hashtable<>();
+ properties.put(EventConstants.EVENT_TOPIC, "decanter/events/*");
+ bundleContext.registerService(EventHandler.class, appender, properties);
}
public void stop(BundleContext bundleContext) {
- if (service != null) {
- service.unregister();
- }
}
}
diff --git a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
index af60665..742dc18 100644
--- a/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
+++ b/appender/log/src/main/java/org/apache/karaf/decanter/appender/log/LogAppender.java
@@ -16,29 +16,25 @@
*/
package org.apache.karaf.decanter.appender.log;
-import org.apache.karaf.decanter.api.Appender;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
/**
* Karaf Decanter Log Appender, logging the collected data using.
*/
-public class LogAppender implements Appender {
+public class LogAppender implements EventHandler {
private final Logger LOGGER = LoggerFactory.getLogger(LogAppender.class);
- public void append(Map<Long, Map<String, Object>> data) {
- for (Long key : data.keySet()) {
- Map<String, Object> inner = data.get(key);
- StringBuilder builder = new StringBuilder();
- builder.append(key).append(" - ");
- for (String innerKey : inner.keySet()) {
- builder.append(innerKey).append(":").append(inner.get(innerKey).toString()).append(" | ");
- }
- LOGGER.info(builder.toString());
+ @Override
+ public void handleEvent(Event event) {
+ StringBuilder builder = new StringBuilder();
+ for (String innerKey : event.getPropertyNames()) {
+ builder.append(innerKey).append(":").append(event.getProperty(innerKey).toString()).append(" | ");
}
+ LOGGER.info(builder.toString());
}
}
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index 98eba4a..933a08c 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -19,8 +19,8 @@
<features name="karaf-decanter-${project.version}" xmlns="http://karaf.apache.org/xmlns/features/v1.2.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://karaf.apache.org/xmlns/features/v1.2.1 http://karaf.apache.org/xmlns/features/v1.2.1">
<feature name="decanter-common" version="${project.version}" description="Karaf Decanter API">
+ <feature>eventadmin</feature>
<bundle>mvn:org.apache.karaf.decanter/org.apache.karaf.decanter.api/${project.version}</bundle>
- <bundle>mvn:org.apache.karaf.decanter/org.apache.karaf.decanter.dispatcher/${project.version}</bundle>
</feature>
<feature name="decanter-simple-scheduler" version="${project.version}" description="Karaf Decanter Simple Scheduler">
diff --git a/collector/jmx/pom.xml b/collector/jmx/pom.xml
index fa74e18..bf4357b 100644
--- a/collector/jmx/pom.xml
+++ b/collector/jmx/pom.xml
@@ -46,6 +46,10 @@
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
<!-- SLF4J -->
<dependency>
diff --git a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java
index 277cb7a..d55dd3f 100644
--- a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java
+++ b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/Activator.java
@@ -16,29 +16,45 @@
*/
package org.apache.karaf.decanter.collector.jmx;
-import org.apache.karaf.decanter.api.PollingCollector;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.util.tracker.ServiceTracker;
-import java.util.Dictionary;
-import java.util.Properties;
-
+@SuppressWarnings("rawtypes")
public class Activator implements BundleActivator {
- private ServiceRegistration service;
+ private ServiceTracker<EventAdmin, ServiceRegistration> tracker;
- public void start(BundleContext bundleContext) throws Exception {
- JmxCollector collector = new JmxCollector();
- Properties properties = new Properties();
- properties.put("name", "jmx");
- service = bundleContext.registerService(PollingCollector.class, collector, (Dictionary) properties);
+ public void start(final BundleContext bundleContext) throws Exception {
+ tracker = new ServiceTracker<EventAdmin, ServiceRegistration>(bundleContext, EventAdmin.class, null) {
+
+ @Override
+ public ServiceRegistration<?> addingService(ServiceReference<EventAdmin> reference) {
+ EventAdmin eventAdmin = bundleContext.getService(reference);
+ JmxCollector collector = new JmxCollector(eventAdmin);
+ Dictionary<String, String> properties = new Hashtable<String, String>();
+ properties.put("decanter.collector.name", "jmx");
+ return bundleContext.registerService(Runnable.class, collector, properties);
+ }
+
+ @Override
+ public void removedService(ServiceReference<EventAdmin> reference, ServiceRegistration reg) {
+ reg.unregister();
+ super.removedService(reference, reg);
+ }
+
+ };
+ tracker.open();
}
public void stop(BundleContext bundleContext) throws Exception {
- if (service != null) {
- service.unregister();
- }
+ tracker.close();
}
}
diff --git a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
index 6fb5dd9..5f04026 100644
--- a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
+++ b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
@@ -27,56 +27,66 @@
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
-import org.apache.karaf.decanter.api.PollingCollector;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Decanter JMX Pooling Collector
*/
-public class JmxCollector implements PollingCollector {
+public class JmxCollector implements Runnable {
private final static Logger LOGGER = LoggerFactory.getLogger(JmxCollector.class);
+ private EventAdmin eventAdmin;
- public Map<Long, Map<String, Object>> collect() throws Exception {
+ public JmxCollector(EventAdmin eventAdmin) {
+ this.eventAdmin = eventAdmin;
+ }
+
+ @Override
+ public void run() {
LOGGER.debug("Karaf Decanter JMX Collector starts harvesting ...");
- Map<Long, Map<String, Object>> collected = new HashMap<>();
-
// TODO be able to pool remote JMX
MBeanServer server = ManagementFactory.getPlatformMBeanServer();
Set<ObjectName> names = server.queryNames(null, null);
for (ObjectName name : names) {
- MBeanAttributeInfo[] attributes = server.getMBeanInfo(name).getAttributes();
- Map<String, Object> data = new HashMap<>();
- data.put("mbean", name.toString());
- for (MBeanAttributeInfo attribute : attributes) {
- // TODO add SLA check on attributes and filtering
- try {
- Object attributeObject = server.getAttribute(name, attribute.getName());
- if (attributeObject instanceof String) {
- data.put(attribute.getName(), (String) attributeObject);
- } else if (attributeObject instanceof CompositeDataSupport) {
- CompositeDataSupport cds = (CompositeDataSupport) attributeObject;
- CompositeType compositeType = cds.getCompositeType();
- Set<String> keySet = compositeType.keySet();
- Map<String, Object> composite = new HashMap<String, Object>();
- for (String key : keySet) {
- Object cdsObject = cds.get(key);
- composite.put(key, cdsObject);
- }
- data.put(attribute.getName(), composite);
- }
- } catch (Exception e) {
- // LOGGER.warn("Can't put MBean {} attribute {} in collected data", name.toString(), attribute.getName(), e);
- }
+ try {
+ Map<String, Object> data = harvestBean(server, name);
+ Event event = new Event("decanter/jmx", data);
+ eventAdmin.postEvent(event);
+ } catch (Exception e) {
+ LOGGER.warn("Error reading mbean " + name, e);
}
- collected.put(System.currentTimeMillis(), data);
}
LOGGER.debug("Karaf Decanter JMX Collector harvesting done");
+ }
- return collected;
+ private Map<String, Object> harvestBean(MBeanServer server, ObjectName name) throws Exception {
+ MBeanAttributeInfo[] attributes = server.getMBeanInfo(name).getAttributes();
+ Map<String, Object> data = new HashMap<>();
+ data.put("mbean", name.toString());
+ for (MBeanAttributeInfo attribute : attributes) {
+ // TODO add SLA check on attributes and filtering
+ Object attributeObject = server.getAttribute(name, attribute.getName());
+ if (attributeObject instanceof String) {
+ data.put(attribute.getName(), (String)attributeObject);
+ } else if (attributeObject instanceof CompositeDataSupport) {
+ CompositeDataSupport cds = (CompositeDataSupport)attributeObject;
+ CompositeType compositeType = cds.getCompositeType();
+ Set<String> keySet = compositeType.keySet();
+ Map<String, Object> composite = new HashMap<String, Object>();
+ for (String key : keySet) {
+ Object cdsObject = cds.get(key);
+ composite.put(key, cdsObject);
+ }
+ data.put(attribute.getName(), composite);
+ }
+
+ }
+ return data;
}
}
diff --git a/collector/log/pom.xml b/collector/log/pom.xml
index ee6fc59..898ce55 100644
--- a/collector/log/pom.xml
+++ b/collector/log/pom.xml
@@ -35,12 +35,6 @@
<dependencies>
- <!-- Decanter API -->
- <dependency>
- <groupId>org.apache.karaf.decanter</groupId>
- <artifactId>org.apache.karaf.decanter.api</artifactId>
- </dependency>
-
<!-- Pax Logging -->
<dependency>
<groupId>org.ops4j.pax.logging</groupId>
diff --git a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java
index c358b4d..f1a581d 100644
--- a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java
+++ b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/Activator.java
@@ -19,36 +19,34 @@
import java.util.Dictionary;
import java.util.Properties;
-import org.apache.karaf.decanter.api.Collector;
-import org.apache.karaf.decanter.api.Dispatcher;
import org.ops4j.pax.logging.spi.PaxAppender;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceReference;
import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.EventAdmin;
import org.osgi.util.tracker.ServiceTracker;
@SuppressWarnings("rawtypes")
public class Activator implements BundleActivator {
- private ServiceTracker<Dispatcher, ServiceRegistration> tracker;
+ private ServiceTracker<EventAdmin, ServiceRegistration> tracker;
public void start(final BundleContext bundleContext) {
- tracker = new ServiceTracker<Dispatcher, ServiceRegistration>(bundleContext, Dispatcher.class, null) {
+ tracker = new ServiceTracker<EventAdmin, ServiceRegistration>(bundleContext, EventAdmin.class, null) {
@SuppressWarnings("unchecked")
@Override
- public ServiceRegistration<?> addingService(ServiceReference<Dispatcher> reference) {
+ public ServiceRegistration<?> addingService(ServiceReference<EventAdmin> reference) {
Properties properties = new Properties();
properties.put("org.ops4j.pax.logging.appender.name", "DecanterLogCollectorAppender");
properties.put("name", "log");
- String[] ifAr = new String[] { PaxAppender.class.getName(), Collector.class.getName() };
- Dispatcher dispatcher = bundleContext.getService(reference);
- LogAppender appender = new LogAppender(dispatcher);
- return bundleContext.registerService(ifAr , appender, (Dictionary) properties);
+ EventAdmin eventAdmin = bundleContext.getService(reference);
+ LogAppender appender = new LogAppender(eventAdmin);
+ return bundleContext.registerService(PaxAppender.class , appender, (Dictionary) properties);
}
@Override
- public void removedService(ServiceReference<Dispatcher> reference, ServiceRegistration reg) {
+ public void removedService(ServiceReference<EventAdmin> reference, ServiceRegistration reg) {
reg.unregister();
super.removedService(reference, reg);
}
diff --git a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
index 7b27aab..0b52e30 100644
--- a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
+++ b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogAppender.java
@@ -19,24 +19,24 @@
import java.util.HashMap;
import java.util.Map;
-import org.apache.karaf.decanter.api.Collector;
-import org.apache.karaf.decanter.api.Dispatcher;
import org.apache.log4j.MDC;
import org.ops4j.pax.logging.spi.PaxAppender;
import org.ops4j.pax.logging.spi.PaxLoggingEvent;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Decanter log collector, event driven implementing a PaxAppender
*/
-public class LogAppender implements PaxAppender, Collector {
+public class LogAppender implements PaxAppender {
private static final String MDC_IN_LOG_APPENDER = "inLogAppender";
private final static String[] ignoredCategories = {"org.apache.karaf.decanter"};
private final static Logger LOGGER = LoggerFactory.getLogger(LogAppender.class);
- private Dispatcher dispatcher;
+ private EventAdmin dispatcher;
- public LogAppender(Dispatcher dispatcher) {
+ public LogAppender(EventAdmin dispatcher) {
this.dispatcher = dispatcher;
}
@@ -58,8 +58,8 @@
private void appendInternal(PaxLoggingEvent event) throws Exception {
LOGGER.debug("Karaf Decanter Log Collector hooked ...");
- Map<Long, Map<String, Object>> collected = new HashMap<>();
Map<String, Object> data = new HashMap<>();
+ data.put("timeStamp", event.getTimeStamp());
data.put("loggerClass", event.getFQNOfLoggerClass());
data.put("loggerName", event.getLoggerName());
data.put("threadName", event.getThreadName());
@@ -67,11 +67,10 @@
data.put("level", event.getLevel().toString());
data.put("renderedMessage", event.getRenderedMessage());
data.put("MDC", event.getProperties());
- collected.put(event.getTimeStamp(), data);
if (!isIgnored(event.getLoggerName())) {
- LOGGER.debug("Calling the Karaf Decanter Appender Controller ...");
- this.dispatcher.dispatch(collected);
+ String topic = "decanter/log/" + event.getLoggerName().replace(".", "/");
+ this.dispatcher.postEvent(new Event(topic, data));
}
}
diff --git a/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java b/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java
index d709892..f2a2919 100644
--- a/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java
+++ b/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/Activator.java
@@ -16,24 +16,15 @@
*/
package org.apache.karaf.decanter.dispatcher;
-import org.apache.karaf.decanter.api.Dispatcher;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
public class Activator implements BundleActivator {
- private ServiceRegistration service;
-
public void start(BundleContext bundleContext) {
- Dispatcher dispatcher = new DefaultDispatcher(bundleContext);
- service = bundleContext.registerService(Dispatcher.class, dispatcher, null);
}
public void stop(BundleContext bundleContext) {
- if (service != null) {
- service.unregister();
- }
}
}
diff --git a/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/DefaultDispatcher.java b/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/DefaultDispatcher.java
deleted file mode 100644
index e8cb364..0000000
--- a/dispatcher/src/main/java/org/apache/karaf/decanter/dispatcher/DefaultDispatcher.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.karaf.decanter.dispatcher;
-
-import org.apache.karaf.decanter.api.Appender;
-import org.apache.karaf.decanter.api.Dispatcher;
-import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-/**
- * Default dispatcher
- */
-public class DefaultDispatcher implements Dispatcher {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(DefaultDispatcher.class);
-
- private BundleContext bundleContext;
-
- public DefaultDispatcher(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
- public void dispatch(Map<Long, Map<String, Object>> data) throws Exception {
- LOGGER.debug("Dispatching collected data");
-
- Collection<ServiceReference<Appender>> references = bundleContext.getServiceReferences(Appender.class, null);
- if (references != null) {
- for (ServiceReference reference : references) {
- try {
- Appender appender = (Appender) bundleContext.getService(reference);
- appender.append(data);
- } catch (Exception e) {
- LOGGER.warn("Can't dispatch collected data", e);
- } finally {
- bundleContext.ungetService(reference);
- }
- }
- }
- LOGGER.debug("Dispatching done");
- }
-
- public BundleContext getBundleContext() {
- return bundleContext;
- }
-
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
-}
diff --git a/scheduler/simple/pom.xml b/scheduler/simple/pom.xml
index 0ca78a6..fcc8a9f 100644
--- a/scheduler/simple/pom.xml
+++ b/scheduler/simple/pom.xml
@@ -50,6 +50,10 @@
<groupId>org.osgi</groupId>
<artifactId>org.osgi.core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
<!-- test -->
<dependency>
diff --git a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java
index d723eb5..2349e41 100644
--- a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java
+++ b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/Activator.java
@@ -19,25 +19,19 @@
import org.apache.karaf.decanter.api.Scheduler;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceRegistration;
public class Activator implements BundleActivator {
private SimpleScheduler scheduler;
- private ServiceRegistration service;
public void start(BundleContext bundleContext) {
scheduler = new SimpleScheduler(bundleContext);
- service = bundleContext.registerService(Scheduler.class, scheduler, null);
+ scheduler.start();
+ bundleContext.registerService(Scheduler.class, scheduler, null);
}
public void stop(BundleContext bundleContext) {
- if (scheduler != null) {
- scheduler.stop();
- }
- if (service != null) {
- service.unregister();
- }
+ scheduler.stop();
}
}
diff --git a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
index e42cf98..dd1ee99 100644
--- a/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
+++ b/scheduler/simple/src/main/java/org/apache/karaf/decanter/scheduler/simple/SimpleScheduler.java
@@ -16,78 +16,52 @@
*/
package org.apache.karaf.decanter.scheduler.simple;
-import org.apache.karaf.decanter.api.Dispatcher;
-import org.apache.karaf.decanter.api.PollingCollector;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.karaf.decanter.api.Scheduler;
import org.osgi.framework.BundleContext;
-import org.osgi.framework.ServiceReference;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.util.tracker.ServiceTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* Very simple Decanter scheduler using a single thread.
*/
public class SimpleScheduler implements Runnable, Scheduler {
-
private final static Logger LOGGER = LoggerFactory.getLogger(SimpleScheduler.class);
- private BundleContext bundleContext;
private AtomicBoolean running = new AtomicBoolean(false);
private long interval = 30000L;
-
- public SimpleScheduler() {
- this.start();
+ ServiceTracker<Runnable, Runnable> collectors;
+
+ SimpleScheduler() {
+ }
+
+ public SimpleScheduler(BundleContext bundleContext) {
+ this.collectors = new ServiceTracker<>(bundleContext, collectorFilter(bundleContext), null);
+ this.collectors.open();
}
- public SimpleScheduler(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- this.start();
+ private Filter collectorFilter(BundleContext bundleContext) {
+ try {
+ return bundleContext.createFilter(String.format("(&(objectClass=%s)(decanter.collector.name=*))", Runnable.class.getName()));
+ } catch (InvalidSyntaxException e) {
+ throw new RuntimeException(e);
+ }
}
public void run() {
LOGGER.debug("Decanter SimpleScheduler thread started ...");
while (running.get()) {
- Map<Long, Map<String, Object>> collected = new HashMap<>();
- try {
- LOGGER.debug("Calling the collectors ...");
- Collection<ServiceReference<PollingCollector>> references = bundleContext.getServiceReferences(PollingCollector.class, null);
- if (references != null) {
- for (ServiceReference<PollingCollector> reference : references) {
- try {
- if (reference != null) {
- PollingCollector collector = bundleContext.getService(reference);
- Map<Long, Map<String, Object>> data = collector.collect();
- collected.putAll(data);
- }
- } catch (Exception e) {
- LOGGER.warn("Can't collect data", e);
- } finally {
- bundleContext.ungetService(reference);
- }
- }
- }
- } catch (Exception e) {
- LOGGER.warn("Can't get polling collector services", e);
- }
- ServiceReference<Dispatcher> reference = null;
- try {
- LOGGER.debug("Calling the dispatcher ...");
- reference = bundleContext.getServiceReference(Dispatcher.class);
- if (reference != null) {
- Dispatcher dispatcher = bundleContext.getService(reference);
- dispatcher.dispatch(collected);
- }
- } catch (Exception e) {
- LOGGER.warn("Can't dispatch using the controller", e);
- } finally {
- if (reference != null) {
- bundleContext.ungetService(reference);
+ LOGGER.debug("Calling the collectors ...");
+ for (Runnable collector : collectors.getServices(new Runnable[] {})) {
+ try {
+ collector.run();
+ } catch (Exception e) {
+ LOGGER.warn("Can't collect data", e);
}
}
try {
@@ -102,6 +76,9 @@
public void stop() {
running.set(false);
+ if (collectors != null) {
+ this.collectors.close();
+ }
}
public void start() {
@@ -123,12 +100,4 @@
}
}
- public BundleContext getBundleContext() {
- return bundleContext;
- }
-
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
-
}
diff --git a/scheduler/simple/src/test/java/org/apache/karaf/decanter/simple/SimpleSchedulerTest.java b/scheduler/simple/src/test/java/org/apache/karaf/decanter/scheduler/simple/SimpleSchedulerTest.java
similarity index 86%
rename from scheduler/simple/src/test/java/org/apache/karaf/decanter/simple/SimpleSchedulerTest.java
rename to scheduler/simple/src/test/java/org/apache/karaf/decanter/scheduler/simple/SimpleSchedulerTest.java
index dda757a..3d86976 100644
--- a/scheduler/simple/src/test/java/org/apache/karaf/decanter/simple/SimpleSchedulerTest.java
+++ b/scheduler/simple/src/test/java/org/apache/karaf/decanter/scheduler/simple/SimpleSchedulerTest.java
@@ -14,19 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.karaf.decanter.simple;
+package org.apache.karaf.decanter.scheduler.simple;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import org.apache.karaf.decanter.api.Scheduler;
import org.apache.karaf.decanter.scheduler.simple.SimpleScheduler;
import org.junit.Test;
-import static org.junit.Assert.*;
-
public class SimpleSchedulerTest {
@Test
public void testSimpleScheduler() throws Exception {
Scheduler scheduler = new SimpleScheduler();
+ scheduler.start();
assertTrue(scheduler.isStarted());
assertEquals("Started", scheduler.state());