[KARAF-7170] Allow to define event.topics property in all collectors
diff --git a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
index 4aa8eea..7f0b7a7 100644
--- a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
+++ b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
@@ -32,18 +32,27 @@
private static final Logger LOG = LoggerFactory.getLogger(DecanterEventNotifier.class.getName());
- private EventAdmin eventAdmin;
+ private EventAdmin dispatcher;
+ private String topic = "decanter/collect/camel/event";
private String camelContextMatcher = ".*";
private String routeMatcher = ".*";
private DefaultExchangeExtender dextender = new DefaultExchangeExtender();
private DecanterCamelEventExtender extender;
- public EventAdmin getEventAdmin() {
- return eventAdmin;
+ public EventAdmin getDispatcher() {
+ return dispatcher;
}
- public void setEventAdmin(EventAdmin eventAdmin) {
- this.eventAdmin = eventAdmin;
+ public void setDispatcher(EventAdmin dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
}
public void setCamelContextMatcher(String camelContextMatcher) {
@@ -151,7 +160,7 @@
extender.extend(eventMap, (Exchange) source);
}
}
- eventAdmin.postEvent(new Event("decanter/collect/camel/event", eventMap));
+ dispatcher.postEvent(new Event(topic, eventMap));
} catch (Exception ex) {
LOG.warn("Failed to handle event", ex);
}
diff --git a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
index 4b7cad8..5ff6bae 100644
--- a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
+++ b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
@@ -31,6 +31,7 @@
public class DecanterInterceptStrategy implements InterceptStrategy {
private EventAdmin dispatcher;
+ private String topic = "decanter/collect/camel/tracer";
private DefaultExchangeExtender dextender = new DefaultExchangeExtender();
private DecanterCamelEventExtender extender;
@@ -72,7 +73,7 @@
data.put(header.substring("decanter.".length()), exchange.getIn().getHeader(header));
}
}
- Event event = new Event("decanter/collect/camel/tracer", data);
+ Event event = new Event(topic, data);
dispatcher.postEvent(event);
}
@@ -87,6 +88,14 @@
public void setDispatcher(EventAdmin dispatcher) {
this.dispatcher = dispatcher;
}
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
public void setIncludeBody(boolean includeBody) {
dextender.setIncludeBody(includeBody);
diff --git a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
index 83cb37f..036f9d3 100644
--- a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
+++ b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
@@ -20,7 +20,6 @@
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.spi.CamelEvent;
import org.junit.Assert;
import org.junit.Test;
import org.osgi.service.event.Event;
@@ -33,7 +32,7 @@
public void testEventNotifier() throws Exception {
MockEventAdmin eventAdmin = new MockEventAdmin();
DecanterEventNotifier notifier = new DecanterEventNotifier();
- notifier.setEventAdmin(eventAdmin);
+ notifier.setDispatcher(eventAdmin);
DefaultCamelContext camelContext = createCamelContext(notifier);
@@ -73,7 +72,7 @@
public void testCamelContextFilter() throws Exception {
MockEventAdmin eventAdmin = new MockEventAdmin();
DecanterEventNotifier notifier = new DecanterEventNotifier();
- notifier.setEventAdmin(eventAdmin);
+ notifier.setDispatcher(eventAdmin);
notifier.setCamelContextMatcher("foo");
DefaultCamelContext camelContext = createCamelContext(notifier);
@@ -88,7 +87,7 @@
public void testRouteIdFilter() throws Exception {
MockEventAdmin eventAdmin = new MockEventAdmin();
DecanterEventNotifier notifier = new DecanterEventNotifier();
- notifier.setEventAdmin(eventAdmin);
+ notifier.setDispatcher(eventAdmin);
notifier.setCamelContextMatcher(".*");
notifier.setRouteMatcher("foo");
@@ -104,7 +103,7 @@
public void testIgnoredEvents() throws Exception {
MockEventAdmin eventAdmin = new MockEventAdmin();
DecanterEventNotifier notifier = new DecanterEventNotifier();
- notifier.setEventAdmin(eventAdmin);
+ notifier.setDispatcher(eventAdmin);
notifier.setIgnoreCamelContextEvents(true);
DefaultCamelContext camelContext = createCamelContext(notifier);
@@ -121,7 +120,7 @@
DecanterEventNotifier notifier = new DecanterEventNotifier();
notifier.setIgnoreCamelContextEvents(true);
notifier.setIgnoreRouteEvents(true);
- notifier.setEventAdmin(eventAdmin);
+ notifier.setDispatcher(eventAdmin);
notifier.setExtender(new TestExtender());
DefaultCamelContext camelContext = createCamelContext(notifier);
diff --git a/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java b/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
index 9757e0d..988bc25 100644
--- a/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
+++ b/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
@@ -27,6 +27,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +50,7 @@
@Reference
private ConfigurationAdmin configurationAdmin;
- private Dictionary<String, Object> properties;
+ private Dictionary<String, Object> config;
@Activate
public void activate(ComponentContext componentContext) {
@@ -57,7 +58,7 @@
}
public void activate(Dictionary<String, Object> properties) {
- this.properties = properties;
+ this.config = properties;
}
@Override
@@ -94,12 +95,14 @@
}
try {
- PropertiesPreparator.prepare(data, properties);
+ PropertiesPreparator.prepare(data, config);
} catch (Exception e) {
// nothing to do
}
- dispatcher.postEvent(new Event("decanter/collect/configadmin", data));
+ String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/configadmin";
+
+ dispatcher.postEvent(new Event(topic, data));
}
}
diff --git a/collector/dropwizard/pom.xml b/collector/dropwizard/pom.xml
index d922421..56d7ff7 100644
--- a/collector/dropwizard/pom.xml
+++ b/collector/dropwizard/pom.xml
@@ -35,6 +35,10 @@
<dependencies>
<dependency>
+ <groupId>org.apache.karaf.decanter.collector</groupId>
+ <artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.2.3</version>
diff --git a/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java b/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
index c3b175b..571a9c0 100644
--- a/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
+++ b/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
@@ -19,13 +19,18 @@
package org.apache.karaf.decanter.collector.dropwizard;
import com.codahale.metrics.*;
+import org.apache.karaf.decanter.collector.utils.PropertiesPreparator;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import java.net.InetAddress;
+import java.util.Dictionary;
import java.util.HashMap;
import java.util.Map;
@@ -45,6 +50,17 @@
@Reference(cardinality = ReferenceCardinality.OPTIONAL)
public MetricSet metricRegistry;
+ public Dictionary<String, Object> config;
+
+ @Activate
+ public void activate(ComponentContext componentContext) {
+ activate(componentContext.getProperties());
+ }
+
+ public void activate(Dictionary<String, Object> config) {
+ this.config = config;
+ }
+
@Override
public void run() {
Map<String, Metric> metrics = metricRegistry.getMetrics();
@@ -95,7 +111,16 @@
data.put("Mean Rate", timer.getMeanRate());
populateSnapshot(timer.getSnapshot(), data);
}
- Event event = new Event("decanter/collect/dropwizard", data);
+
+ try {
+ PropertiesPreparator.prepare(data, config);
+ } catch (Exception e) {
+ // nothing to do
+ }
+
+ String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/dropwizard";
+
+ Event event = new Event(topic, data);
dispatcher.postEvent(event);
}
}
diff --git a/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java b/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
index 5122bfb..c3a0f08 100644
--- a/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
+++ b/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
@@ -24,6 +24,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,7 +81,7 @@
data.put("query", key.substring("query.".length()));
data.putAll(executeQuery(druidBroker, (String) config.get(key)));
PropertiesPreparator.prepare(data, config);
- String topic = (config.get("topic") != null) ? (String) config.get("topic") : "decanter/collect/druid";
+ String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/druid";
dispatcher.postEvent(new Event(topic, data));
} catch (Exception e) {
LOGGER.warn("Can't execute query {}", key.substring("query.".length()), e);
diff --git a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
index 03cad36..8ef9176 100644
--- a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
+++ b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
@@ -43,6 +43,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +66,7 @@
@Reference
private EventAdmin dispatcher;
- private Dictionary<String, Object> configuration;
+ private Dictionary<String, Object> config;
private RestHighLevelClient restClient;
@Activate
@@ -74,7 +75,7 @@
}
public void activate(Dictionary<String, Object> configuration) {
- this.configuration = configuration;
+ this.config = configuration;
String addressesString = (configuration.get("addresses") != null) ? configuration.get("addresses").toString() : "http://localhost:9200";
String username = (configuration.get("username") != null) ? configuration.get("username").toString() : null;
String password = (configuration.get("password") != null) ? configuration.get("password").toString() : null;
@@ -126,12 +127,12 @@
public void run() {
SearchRequest searchRequest = new SearchRequest();
- String index = (configuration.get("index") != null) ? configuration.get("index").toString() : "decanter";
+ String index = (config.get("index") != null) ? config.get("index").toString() : "decanter";
searchRequest.indices(index);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- String query = (configuration.get("query") != null) ? configuration.get("query").toString() : null;
+ String query = (config.get("query") != null) ? config.get("query").toString() : null;
QueryBuilder queryBuilder;
if (query == null) {
queryBuilder = QueryBuilders.matchAllQuery();
@@ -139,17 +140,17 @@
queryBuilder = QueryBuilders.queryStringQuery(query);
}
searchSourceBuilder.query(queryBuilder);
- String fromString = (configuration.get("from") != null) ? configuration.get("from").toString() : null;
+ String fromString = (config.get("from") != null) ? config.get("from").toString() : null;
if (fromString != null) {
int from = Integer.parseInt(fromString);
searchSourceBuilder.from(from);
}
- String sizeString = (configuration.get("size") != null) ? configuration.get("size").toString() : null;
+ String sizeString = (config.get("size") != null) ? config.get("size").toString() : null;
if (sizeString != null) {
int size = Integer.parseInt(sizeString);
searchSourceBuilder.size(size);
}
- String timeoutString = (configuration.get("timeout") != null) ? configuration.get("timeout").toString() : null;
+ String timeoutString = (config.get("timeout") != null) ? config.get("timeout").toString() : null;
if (timeoutString != null) {
int timeout = Integer.parseInt(timeoutString);
searchSourceBuilder.timeout(new TimeValue(timeout, TimeUnit.SECONDS));
@@ -176,12 +177,14 @@
LOGGER.error("Can't query elasticsearch", e);
}
try {
- PropertiesPreparator.prepare(data, configuration);
+ PropertiesPreparator.prepare(data, config);
} catch (Exception e) {
LOGGER.warn("Can't prepare event", e);
}
- dispatcher.postEvent(new Event("decanter/collect/elasticsearch", data));
+ String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/elasticsearch";
+
+ dispatcher.postEvent(new Event(topic, data));
}
/**
diff --git a/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java b/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
index 44d79b0..b29aa33 100644
--- a/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
+++ b/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
@@ -23,6 +23,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.osgi.service.event.EventHandler;
import javax.security.auth.Subject;
@@ -38,11 +39,11 @@
@Reference
public EventAdmin dispatcher;
- private Dictionary<String, Object> properties;
+ private Dictionary<String, Object> config;
@Activate
public void activate(ComponentContext context) {
- properties = context.getProperties();
+ config = context.getProperties();
}
@Override
@@ -68,7 +69,7 @@
}
try {
- PropertiesPreparator.prepare(data, properties);
+ PropertiesPreparator.prepare(data, config);
} catch (Exception e) {
// nothing to do
}
diff --git a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
index 8f9460d..8cbc143 100644
--- a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
+++ b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
@@ -36,6 +36,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +59,8 @@
private String path;
private String regex;
private Pattern compiledRegex;
+
+ private String topic;
/**
* additional properties provided by the user
@@ -88,6 +91,7 @@
if (regex != null) {
compiledRegex = Pattern.compile(regex);
}
+ topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/file/";
}
@Deactivate
@@ -120,7 +124,7 @@
LOGGER.warn("Can't fully prepare data for the dispatcher", e);
}
- Event event = new Event("decanter/collect/file/" + type, data);
+ Event event = new Event(topic + type, data);
dispatcher.postEvent(event);
}
diff --git a/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java b/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
index 04dbd0c..ef9d4e7 100644
--- a/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
+++ b/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
@@ -58,7 +58,7 @@
private final static Logger LOGGER = LoggerFactory.getLogger(JdbcCollector.class);
private String query;
- private String dispatcherTopic;
+ private String topic;
private Dictionary<String, Object> properties;
private Connection connection;
private PreparedStatement preparedStatement;
@@ -66,15 +66,15 @@
@Activate
public void activate(ComponentContext context) throws Exception {
properties = context.getProperties();
- open(properties);
+ activate(properties);
}
- public void open(Dictionary<String, Object> config) throws Exception {
+ public void activate(Dictionary<String, Object> config) throws Exception {
query = getProperty(config, "query", null);
if (query == null) {
throw new IllegalStateException("Query is mandatory");
}
- dispatcherTopic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/jdbc");
+ topic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/jdbc");
connection = dataSource.getConnection();
preparedStatement = connection.prepareStatement(query);
@@ -101,7 +101,7 @@
List<Map<String, Object>> dataRows = query();
for (Map<String, Object> data : dataRows) {
- Event event = new Event(dispatcherTopic, data);
+ Event event = new Event(topic, data);
dispatcher.postEvent(event);
}
}
diff --git a/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java b/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
index 1d2ecb6..8a464ab 100644
--- a/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
+++ b/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
@@ -62,7 +62,7 @@
collector.dataSource = dataSource;
Dictionary<String, Object> config = new Hashtable<>();
config.put("query", "select * from TEST");
- collector.open(config);
+ collector.activate(config);
List<Map<String, Object>> dataRows = collector.query();
Assert.assertEquals(2, dataRows.size());
diff --git a/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java b/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
index 4573bbf..8e98d7e 100644
--- a/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
+++ b/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
@@ -26,6 +26,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
@@ -155,7 +156,8 @@
} catch (Exception e) {
// nothing to do
}
- Event event = new Event("decanter/collect/jetty", data);
+ String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/jetty";
+ Event event = new Event(topic, data);
dispatcher.postEvent(event);
}
diff --git a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
index 495cdad..35a3b0a 100644
--- a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
+++ b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
@@ -38,6 +38,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,13 +150,15 @@
names.addAll(connection.queryNames(getObjectName(null), null));
}
+ String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/jmx/";
+
for (ObjectName name : names) {
LOGGER.debug("Harvesting {}", name);
try {
Map<String, Object> data = harvester.harvestBean(name);
PropertiesPreparator.prepare(data, properties);
data.put("host", host);
- Event event = new Event("decanter/collect/jmx/" + this.type + "/" + getTopic(name), data);
+ Event event = new Event(topic + this.type + "/" + getTopic(name), data);
LOGGER.debug("Posting for {}", name);
dispatcher.postEvent(event);
} catch (Exception e) {
@@ -174,7 +177,7 @@
Map<String, Object> data = harvester.executeOperation(operation, objectName, operationName, arguments, signatures);
PropertiesPreparator.prepare(data, properties);
data.put("host", host);
- Event event = new Event("decanter/collect/jmx/" + this.type + "/" + getTopic(objectName), data);
+ Event event = new Event(topic + this.type + "/" + getTopic(objectName), data);
dispatcher.postEvent(event);
} else {
LOGGER.warn("{} is not well configured ({})", operation, raw);
diff --git a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
index 98d64de..5ca4ff2 100644
--- a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
+++ b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
@@ -33,6 +33,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,8 +117,8 @@
if (loggerName == null || loggerName.isEmpty()) {
loggerName = "default";
}
- String topic = "decanter/collect/log/" + cleanLoggerName(loggerName);
- this.dispatcher.postEvent(new Event(topic, data));
+ String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/log/";
+ this.dispatcher.postEvent(new Event(topic + cleanLoggerName(loggerName), data));
}
/*
diff --git a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
index 11a0486..ce79648 100644
--- a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
+++ b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
@@ -45,6 +45,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,12 +138,12 @@
LOGGER.warn("Can't prepare data for the dispatcher", e);
}
- String topic = loggerName2Topic(loggingEvent.getLoggerName());
- Event event = new Event(topic, data);
+ String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/log/";
+ Event event = new Event(loggerName2Topic(topic, loggingEvent.getLoggerName()), data);
dispatcher.postEvent(event);
}
- static String loggerName2Topic(String loggerName) {
+ static String loggerName2Topic(String topic, String loggerName) {
StringBuilder out = new StringBuilder();
for (int c = 0; c < loggerName.length(); c++) {
Character ch = loggerName.charAt(c);
@@ -156,7 +157,7 @@
while (outSt.length() > 1 && outSt.endsWith("/")) {
outSt = outSt.substring(0, outSt.length() - 1);
}
- return "decanter/collect/log/" + outSt.replace(".", "/");
+ return topic + outSt.replace(".", "/");
}
private void putLocation(Map<String, Object> data, LocationInfo loc) {
diff --git a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
index 89e1688..2e462c9 100644
--- a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
+++ b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
@@ -73,7 +73,7 @@
@Test
public void testLoggerName2Topic() {
- String topic = SocketCollector.loggerName2Topic("test.[Tomcat].[localhost].[/]");
+ String topic = SocketCollector.loggerName2Topic("decanter/collect/log/", "test.[Tomcat].[localhost].[/]");
Assert.assertEquals("decanter/collect/log/test/Tomcat/localhost", topic);
}
diff --git a/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java b/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
index e02a027..1148ba9 100644
--- a/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
+++ b/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
@@ -24,6 +24,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,11 +82,7 @@
public void activate(Dictionary<String, Object> config) throws Exception {
this.config = config;
- if (config.get("topic") != null) {
- topic = (String) config.get("topic");
- } else {
- topic = "decanter/collect/openstack";
- }
+ topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/openstack";
if (config.get("openstack.identity") == null) {
throw new IllegalStateException("openstack.identity is not configured");
}
diff --git a/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java b/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
index e725fe3..b555958 100644
--- a/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
+++ b/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
@@ -23,6 +23,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
@@ -368,7 +369,9 @@
PropertiesPreparator.prepare(data, properties);
- dispatcher.postEvent(new Event("decanter/collect/oshi", data));
+ String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/oshi";
+
+ dispatcher.postEvent(new Event(topic, data));
} catch (Exception e) {
LOGGER.warn("Can't get oshi system metrics", e);
}
diff --git a/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java b/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
index e31b594..386c455 100644
--- a/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
+++ b/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
@@ -23,6 +23,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,7 +91,8 @@
}
}
PropertiesPreparator.prepare(data, properties);
- dispatcher.postEvent(new Event("decanter/collect/prometheus", data));
+ String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/prometheus";
+ dispatcher.postEvent(new Event(topic, data));
} catch (Exception e) {
LOGGER.warn("Can't get Prometheus metrics", e);
e.printStackTrace();
diff --git a/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java b/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
index 8dff199..d604be3 100644
--- a/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
+++ b/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
@@ -24,6 +24,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.redisson.Redisson;
import org.redisson.api.RMap;
import org.redisson.api.RedissonClient;
@@ -102,7 +103,8 @@
} catch (Exception e) {
LOGGER.warn("Can't prepare data", e);
}
- dispatcher.postEvent(new Event("decanter/collect/redis", data));
+ String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/redis";
+ dispatcher.postEvent(new Event(topic, data));
}
}
diff --git a/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java b/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
index d27d3c2..48908fc 100644
--- a/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
+++ b/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
@@ -39,6 +39,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,7 +62,7 @@
private final static Logger LOGGER = LoggerFactory.getLogger(RestServletCollector.class);
- private String baseTopic;
+ private String topic;
private Dictionary<String, Object> properties;
private long maxRequestSize = 100000;
@@ -69,7 +70,7 @@
@Activate
public void activate(ComponentContext context) throws MalformedURLException {
Dictionary<String, Object> props = context.getProperties();
- this.baseTopic = getProperty(props, "topic", "decanter/collect/rest-servlet");
+ this.topic = getProperty(props, EventConstants.EVENT_TOPIC, "decanter/collect/rest-servlet");
this.properties = props;
if (this.properties.get("max.request.size") != null) {
maxRequestSize = Long.parseLong((String)this.properties.get("max.request.size"));
@@ -101,7 +102,7 @@
PropertiesPreparator.prepare(data, properties);
- Event event = new Event(baseTopic, data);
+ Event event = new Event(topic, data);
dispatcher.postEvent(event);
resp.setStatus(HttpServletResponse.SC_CREATED);
LOGGER.debug("Karaf Decanter REST Servlet Collector harvesting done");
diff --git a/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java b/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
index 57e674d..4d534bf 100644
--- a/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
+++ b/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
@@ -32,6 +32,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,7 +78,7 @@
this.config = config;
this.url = new URL(getProperty(config, "url", "http://localhost:8181"));
this.paths = getProperty(config, "paths", "").split(",");
- this.topic = getProperty(config, "topic", "decanter/collect/rest");
+ this.topic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/rest");
this.requestMethod = getProperty(config, "request.method", "GET");
this.user = getProperty(config, "user", null);
this.password = getProperty(config, "password", null);
diff --git a/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java b/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
index 6779680..04ad87d 100644
--- a/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
+++ b/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
@@ -21,6 +21,7 @@
import org.osgi.service.component.annotations.*;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.snmp4j.*;
@@ -238,7 +239,8 @@
data.put(variableBinding.getOid().toString(), variableBinding.getVariable().toString());
}
// send event
- dispatcher.postEvent(new Event("decanter/collector/snmp", data));
+ String topic = (configuration.get(EventConstants.EVENT_TOPIC) != null) ? (String) configuration.get(EventConstants.EVENT_TOPIC) : "decanter/collector/snmp";
+ dispatcher.postEvent(new Event(topic, data));
}
private OctetString convertToOctetString(String value) {
diff --git a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
index fb6d6e1..a7ebeff 100644
--- a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
+++ b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
@@ -34,6 +34,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,10 +71,7 @@
if (config.get("url") == null) {
throw new IllegalArgumentException("url property is mandatory");
}
- this.topic = "decanter/collect/soap";
- if (config.get("topic") != null) {
- this.topic = (String) config.get("topic");
- }
+ topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/soap";
url = new URL((String) config.get("url"));
if (config.get("soap.request") == null) {
throw new IllegalStateException("soap.request property is mandatory");
diff --git a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 996dfa5..cb1f210 100644
--- a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++ b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -57,7 +57,7 @@
private boolean open;
private ExecutorService executor;
private Dictionary<String, Object> properties;
- private String eventAdminTopic;
+ private String topic;
private long maxRequestSize = 100000;
@Reference
@@ -85,7 +85,7 @@
this.protocol = Protocol.TCP;
}
- eventAdminTopic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket");
+ topic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket");
switch (protocol) {
case TCP:
@@ -181,7 +181,7 @@
data.put("type", "socket");
data.putAll(unmarshaller.unmarshal(new ByteArrayInputStream(line.getBytes())));
PropertiesPreparator.prepare(data, properties);
- Event event = new Event(eventAdminTopic, data);
+ Event event = new Event(topic, data);
dispatcher.postEvent(event);
}
}
@@ -217,7 +217,7 @@
LOGGER.warn("Can't prepare data for the dispatcher", e);
}
- Event event = new Event(eventAdminTopic, data);
+ Event event = new Event(topic, data);
dispatcher.postEvent(event);
datagramSocket.send(packet);
} catch (EOFException e) {
diff --git a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
index 599cf49..d21ba40 100644
--- a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
+++ b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
@@ -39,6 +39,7 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,7 +66,7 @@
@Activate
public void activate(ComponentContext context) {
this.properties = context.getProperties();
- this.topic = context.getProperties().get("topic") != null ? String.class.cast(context.getProperties().get("topic")) : "decanter/collect/system/";
+ this.topic = context.getProperties().get(EventConstants.EVENT_TOPIC) != null ? String.class.cast(context.getProperties().get(EventConstants.EVENT_TOPIC)) : "decanter/collect/system/";
if (!this.topic.endsWith("/")) {
this.topic = this.topic + "/";
}
diff --git a/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java b/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
index 713ef91..38255a9 100644
--- a/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
+++ b/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
@@ -22,7 +22,6 @@
import org.apache.camel.core.osgi.OsgiDataFormatResolver;
import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
import org.apache.camel.core.osgi.OsgiLanguageResolver;
-import org.apache.camel.impl.DefaultCamelContext;
import org.apache.karaf.decanter.collector.camel.DecanterEventNotifier;
import org.apache.karaf.decanter.collector.camel.DecanterInterceptStrategy;
import org.apache.karaf.itests.KarafTestSupport;
@@ -136,7 +135,7 @@
// create route with notifier
EventAdmin eventAdmin = getOsgiService(EventAdmin.class);
DecanterEventNotifier notifier = new DecanterEventNotifier();
- notifier.setEventAdmin(eventAdmin);
+ notifier.setDispatcher(eventAdmin);
RouteBuilder builder = new RouteBuilder() {
@Override
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc b/manual/src/main/asciidoc/user-guide/collectors.adoc
index 4f45f64..0a986de 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -1242,6 +1242,18 @@
You have to define the locations of the OpenStack APIs and if you enabled requesting the APIs or not.
+==== Target dispatcher topics
+
+All collectors use a default Decanter dispatcher topic name. However, you can change the topic name with the one of your choice.
+
+For that, you have to set `event.topics` property in the collector configuration.
+
+For instance:
+
+----
+event.topics=my/topic/name
+----
+
==== Customizing properties in collectors
You can add, rename or remove properties collected by the collectors before sending it to the dispatcher.
diff --git a/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java b/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
index 78c8284..bb68296 100644
--- a/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
+++ b/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
@@ -43,10 +43,7 @@
@Activate
public void activate(ComponentContext componentContext) {
Dictionary<String, Object> properties = componentContext.getProperties();
- String targetTopic = "decanter/process/passthrough";
- if (properties.get("target.topics") != null) {
- targetTopic = properties.get("target.topics").toString();
- }
+ targetTopic = (properties.get("target.topics") != null) ? (String) properties.get("target.topics") : "decanter/process/passthrough";
activate(targetTopic);
}