[KARAF-7170] Allow to define event.topics property in all collectors
diff --git a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
index 4aa8eea..7f0b7a7 100644
--- a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
+++ b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifier.java
@@ -32,18 +32,27 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(DecanterEventNotifier.class.getName());
 
-    private EventAdmin eventAdmin;
+    private EventAdmin dispatcher;
+    private String topic = "decanter/collect/camel/event";
     private String camelContextMatcher = ".*";
     private String routeMatcher = ".*";
     private DefaultExchangeExtender dextender = new DefaultExchangeExtender();
     private DecanterCamelEventExtender extender;
 
-    public EventAdmin getEventAdmin() {
-        return eventAdmin;
+    public EventAdmin getDispatcher() {
+        return dispatcher;
     }
 
-    public void setEventAdmin(EventAdmin eventAdmin) {
-        this.eventAdmin = eventAdmin;
+    public void setDispatcher(EventAdmin dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
     }
 
     public void setCamelContextMatcher(String camelContextMatcher) {
@@ -151,7 +160,7 @@
                         extender.extend(eventMap, (Exchange) source);
                     }
                 }
-                eventAdmin.postEvent(new Event("decanter/collect/camel/event", eventMap));
+                dispatcher.postEvent(new Event(topic, eventMap));
             } catch (Exception ex) {
                 LOG.warn("Failed to handle event", ex);
             }
diff --git a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
index 4b7cad8..5ff6bae 100644
--- a/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
+++ b/collector/camel/src/main/java/org/apache/karaf/decanter/collector/camel/DecanterInterceptStrategy.java
@@ -31,6 +31,7 @@
 public class DecanterInterceptStrategy implements InterceptStrategy {
 
     private EventAdmin dispatcher;
+    private String topic = "decanter/collect/camel/tracer";
     private DefaultExchangeExtender dextender = new DefaultExchangeExtender();
     private DecanterCamelEventExtender extender;
 
@@ -72,7 +73,7 @@
                 data.put(header.substring("decanter.".length()), exchange.getIn().getHeader(header));
             }
         }
-        Event event = new Event("decanter/collect/camel/tracer", data);
+        Event event = new Event(topic, data);
         dispatcher.postEvent(event);
     }
     
@@ -87,6 +88,14 @@
     public void setDispatcher(EventAdmin dispatcher) {
         this.dispatcher = dispatcher;
     }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
     
     public void setIncludeBody(boolean includeBody) {
         dextender.setIncludeBody(includeBody);
diff --git a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
index 83cb37f..036f9d3 100644
--- a/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
+++ b/collector/camel/src/test/java/org/apache/karaf/decanter/collector/camel/DecanterEventNotifierTest.java
@@ -20,7 +20,6 @@
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
-import org.apache.camel.spi.CamelEvent;
 import org.junit.Assert;
 import org.junit.Test;
 import org.osgi.service.event.Event;
@@ -33,7 +32,7 @@
     public void testEventNotifier() throws Exception {
         MockEventAdmin eventAdmin = new MockEventAdmin();
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
 
         DefaultCamelContext camelContext = createCamelContext(notifier);
 
@@ -73,7 +72,7 @@
     public void testCamelContextFilter() throws Exception {
         MockEventAdmin eventAdmin = new MockEventAdmin();
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
         notifier.setCamelContextMatcher("foo");
 
         DefaultCamelContext camelContext = createCamelContext(notifier);
@@ -88,7 +87,7 @@
     public void testRouteIdFilter() throws Exception {
         MockEventAdmin eventAdmin = new MockEventAdmin();
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
         notifier.setCamelContextMatcher(".*");
         notifier.setRouteMatcher("foo");
 
@@ -104,7 +103,7 @@
     public void testIgnoredEvents() throws Exception {
         MockEventAdmin eventAdmin = new MockEventAdmin();
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
         notifier.setIgnoreCamelContextEvents(true);
 
         DefaultCamelContext camelContext = createCamelContext(notifier);
@@ -121,7 +120,7 @@
         DecanterEventNotifier notifier = new DecanterEventNotifier();
         notifier.setIgnoreCamelContextEvents(true);
         notifier.setIgnoreRouteEvents(true);
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
         notifier.setExtender(new TestExtender());
 
         DefaultCamelContext camelContext = createCamelContext(notifier);
diff --git a/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java b/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
index 9757e0d..988bc25 100644
--- a/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
+++ b/collector/configadmin/src/main/java/org/apache/karaf/decanter/collector/configadmin/ConfigAdminCollector.java
@@ -27,6 +27,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@
     @Reference
     private ConfigurationAdmin configurationAdmin;
 
-    private Dictionary<String, Object> properties;
+    private Dictionary<String, Object> config;
 
     @Activate
     public void activate(ComponentContext componentContext) {
@@ -57,7 +58,7 @@
     }
 
     public void activate(Dictionary<String, Object> properties) {
-        this.properties = properties;
+        this.config = properties;
     }
 
     @Override
@@ -94,12 +95,14 @@
         }
 
         try {
-            PropertiesPreparator.prepare(data, properties);
+            PropertiesPreparator.prepare(data, config);
         } catch (Exception e) {
             // nothing to do
         }
 
-        dispatcher.postEvent(new Event("decanter/collect/configadmin", data));
+        String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/configadmin";
+
+        dispatcher.postEvent(new Event(topic, data));
     }
 
 }
diff --git a/collector/dropwizard/pom.xml b/collector/dropwizard/pom.xml
index d922421..56d7ff7 100644
--- a/collector/dropwizard/pom.xml
+++ b/collector/dropwizard/pom.xml
@@ -35,6 +35,10 @@
 
     <dependencies>
         <dependency>
+            <groupId>org.apache.karaf.decanter.collector</groupId>
+            <artifactId>org.apache.karaf.decanter.collector.utils</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.dropwizard.metrics</groupId>
             <artifactId>metrics-core</artifactId>
             <version>4.2.3</version>
diff --git a/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java b/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
index c3b175b..571a9c0 100644
--- a/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
+++ b/collector/dropwizard/src/main/java/org/apache/karaf/decanter/collector/dropwizard/DecanterReporterCollector.java
@@ -19,13 +19,18 @@
 package org.apache.karaf.decanter.collector.dropwizard;
 
 import com.codahale.metrics.*;
+import org.apache.karaf.decanter.collector.utils.PropertiesPreparator;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 
 import java.net.InetAddress;
+import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -45,6 +50,17 @@
     @Reference(cardinality = ReferenceCardinality.OPTIONAL)
     public MetricSet metricRegistry;
 
+    public Dictionary<String, Object> config;
+
+    @Activate
+    public void activate(ComponentContext componentContext) {
+        activate(componentContext.getProperties());
+    }
+
+    public void activate(Dictionary<String, Object> config) {
+        this.config = config;
+    }
+
     @Override
     public void run() {
         Map<String, Metric> metrics = metricRegistry.getMetrics();
@@ -95,7 +111,16 @@
                 data.put("Mean Rate", timer.getMeanRate());
                 populateSnapshot(timer.getSnapshot(), data);
             }
-            Event event = new Event("decanter/collect/dropwizard", data);
+
+            try {
+                PropertiesPreparator.prepare(data, config);
+            } catch (Exception e) {
+                // nothing to do
+            }
+
+            String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/dropwizard";
+
+            Event event = new Event(topic, data);
             dispatcher.postEvent(event);
         }
     }
diff --git a/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java b/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
index 5122bfb..c3a0f08 100644
--- a/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
+++ b/collector/druid/src/main/java/org/apache/karaf/decanter/collector/druid/DruidCollector.java
@@ -24,6 +24,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,7 +81,7 @@
                     data.put("query", key.substring("query.".length()));
                     data.putAll(executeQuery(druidBroker, (String) config.get(key)));
                     PropertiesPreparator.prepare(data, config);
-                    String topic = (config.get("topic") != null) ? (String) config.get("topic") : "decanter/collect/druid";
+                    String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/druid";
                     dispatcher.postEvent(new Event(topic, data));
                 } catch (Exception e) {
                     LOGGER.warn("Can't execute query {}", key.substring("query.".length()), e);
diff --git a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
index 03cad36..8ef9176 100644
--- a/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
+++ b/collector/elasticsearch/src/main/java/org/apache/karaf/decanter/collector/elasticsearch/ElasticsearchCollector.java
@@ -43,6 +43,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +66,7 @@
     @Reference
     private EventAdmin dispatcher;
 
-    private Dictionary<String, Object> configuration;
+    private Dictionary<String, Object> config;
     private RestHighLevelClient restClient;
 
     @Activate
@@ -74,7 +75,7 @@
     }
 
     public void activate(Dictionary<String, Object> configuration) {
-        this.configuration = configuration;
+        this.config = configuration;
         String addressesString = (configuration.get("addresses") != null) ? configuration.get("addresses").toString() : "http://localhost:9200";
         String username = (configuration.get("username") != null) ? configuration.get("username").toString() : null;
         String password = (configuration.get("password") != null) ? configuration.get("password").toString() : null;
@@ -126,12 +127,12 @@
     public void run() {
         SearchRequest searchRequest = new SearchRequest();
 
-        String index = (configuration.get("index") != null) ? configuration.get("index").toString() : "decanter";
+        String index = (config.get("index") != null) ? config.get("index").toString() : "decanter";
         searchRequest.indices(index);
 
         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 
-        String query = (configuration.get("query") != null) ? configuration.get("query").toString() : null;
+        String query = (config.get("query") != null) ? config.get("query").toString() : null;
         QueryBuilder queryBuilder;
         if (query == null) {
             queryBuilder = QueryBuilders.matchAllQuery();
@@ -139,17 +140,17 @@
             queryBuilder = QueryBuilders.queryStringQuery(query);
         }
         searchSourceBuilder.query(queryBuilder);
-        String fromString = (configuration.get("from") != null) ? configuration.get("from").toString() : null;
+        String fromString = (config.get("from") != null) ? config.get("from").toString() : null;
         if (fromString != null) {
             int from = Integer.parseInt(fromString);
             searchSourceBuilder.from(from);
         }
-        String sizeString = (configuration.get("size") != null) ? configuration.get("size").toString() : null;
+        String sizeString = (config.get("size") != null) ? config.get("size").toString() : null;
         if (sizeString != null) {
             int size = Integer.parseInt(sizeString);
             searchSourceBuilder.size(size);
         }
-        String timeoutString = (configuration.get("timeout") != null) ? configuration.get("timeout").toString() : null;
+        String timeoutString = (config.get("timeout") != null) ? config.get("timeout").toString() : null;
         if (timeoutString != null) {
             int timeout = Integer.parseInt(timeoutString);
             searchSourceBuilder.timeout(new TimeValue(timeout, TimeUnit.SECONDS));
@@ -176,12 +177,14 @@
             LOGGER.error("Can't query elasticsearch", e);
         }
         try {
-            PropertiesPreparator.prepare(data, configuration);
+            PropertiesPreparator.prepare(data, config);
         } catch (Exception e) {
             LOGGER.warn("Can't prepare event", e);
         }
 
-        dispatcher.postEvent(new Event("decanter/collect/elasticsearch", data));
+        String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/elasticsearch";
+
+        dispatcher.postEvent(new Event(topic, data));
     }
 
     /**
diff --git a/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java b/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
index 44d79b0..b29aa33 100644
--- a/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
+++ b/collector/eventadmin/src/main/java/org/apache/karaf/decanter/collector/eventadmin/EventCollector.java
@@ -23,6 +23,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.osgi.service.event.EventHandler;
 
 import javax.security.auth.Subject;
@@ -38,11 +39,11 @@
     @Reference
     public EventAdmin dispatcher;
 
-    private Dictionary<String, Object> properties;
+    private Dictionary<String, Object> config;
 
     @Activate
     public void activate(ComponentContext context) {
-        properties = context.getProperties();
+        config = context.getProperties();
     }
 
     @Override
@@ -68,7 +69,7 @@
         }
 
         try {
-            PropertiesPreparator.prepare(data, properties);
+            PropertiesPreparator.prepare(data, config);
         } catch (Exception e) {
             // nothing to do
         }
diff --git a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
index 8f9460d..8cbc143 100644
--- a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
+++ b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
@@ -36,6 +36,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +59,8 @@
     private String path;
     private String regex;
     private Pattern compiledRegex;
+
+    private String topic;
     
     /**
      * additional properties provided by the user
@@ -88,6 +91,7 @@
         if (regex != null) {
             compiledRegex  = Pattern.compile(regex);
         }
+        topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/file/";
     }
     
     @Deactivate
@@ -120,7 +124,7 @@
             LOGGER.warn("Can't fully prepare data for the dispatcher", e);
         }
 
-        Event event = new Event("decanter/collect/file/" + type, data);
+        Event event = new Event(topic + type, data);
         dispatcher.postEvent(event);
     }
 
diff --git a/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java b/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
index 04dbd0c..ef9d4e7 100644
--- a/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
+++ b/collector/jdbc/src/main/java/org/apache/karaf/decanter/collector/jdbc/JdbcCollector.java
@@ -58,7 +58,7 @@
     private final static Logger LOGGER = LoggerFactory.getLogger(JdbcCollector.class);
 
     private String query;
-    private String dispatcherTopic;
+    private String topic;
     private Dictionary<String, Object> properties;
     private Connection connection;
     private PreparedStatement preparedStatement;
@@ -66,15 +66,15 @@
     @Activate
     public void activate(ComponentContext context) throws Exception {
         properties = context.getProperties();
-        open(properties);
+        activate(properties);
     }
 
-    public void open(Dictionary<String, Object> config)  throws Exception {
+    public void activate(Dictionary<String, Object> config)  throws Exception {
         query = getProperty(config, "query", null);
         if (query == null) {
             throw new IllegalStateException("Query is mandatory");
         }
-        dispatcherTopic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/jdbc");
+        topic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/jdbc");
 
         connection = dataSource.getConnection();
         preparedStatement = connection.prepareStatement(query);
@@ -101,7 +101,7 @@
         List<Map<String, Object>> dataRows = query();
 
         for (Map<String, Object> data : dataRows) {
-            Event event = new Event(dispatcherTopic, data);
+            Event event = new Event(topic, data);
             dispatcher.postEvent(event);
         }
     }
diff --git a/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java b/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
index 1d2ecb6..8a464ab 100644
--- a/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
+++ b/collector/jdbc/src/test/java/org/apache/karaf/decanter/collector/jdbc/TestJdbcCollector.java
@@ -62,7 +62,7 @@
         collector.dataSource = dataSource;
         Dictionary<String, Object> config = new Hashtable<>();
         config.put("query", "select * from TEST");
-        collector.open(config);
+        collector.activate(config);
 
         List<Map<String, Object>> dataRows = collector.query();
         Assert.assertEquals(2, dataRows.size());
diff --git a/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java b/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
index 4573bbf..8e98d7e 100644
--- a/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
+++ b/collector/jetty/src/main/java/org/apache/karaf/decanter/collector/jetty/DecanterCollectorJettyHandler.java
@@ -26,6 +26,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -155,7 +156,8 @@
         } catch (Exception e) {
             // nothing to do
         }
-        Event event = new Event("decanter/collect/jetty", data);
+        String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/jetty";
+        Event event = new Event(topic, data);
         dispatcher.postEvent(event);
     }
 
diff --git a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
index 495cdad..35a3b0a 100644
--- a/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
+++ b/collector/jmx/src/main/java/org/apache/karaf/decanter/collector/jmx/JmxCollector.java
@@ -38,6 +38,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -149,13 +150,15 @@
                 names.addAll(connection.queryNames(getObjectName(null), null));
             }
 
+            String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/jmx/";
+
             for (ObjectName name : names) {
                 LOGGER.debug("Harvesting {}", name);
                 try {
                     Map<String, Object> data = harvester.harvestBean(name);
                     PropertiesPreparator.prepare(data, properties);
                     data.put("host", host);
-                    Event event = new Event("decanter/collect/jmx/" + this.type + "/" + getTopic(name), data);
+                    Event event = new Event(topic + this.type + "/" + getTopic(name), data);
                     LOGGER.debug("Posting for {}", name);
                     dispatcher.postEvent(event);
                 } catch (Exception e) {
@@ -174,7 +177,7 @@
                     Map<String, Object> data = harvester.executeOperation(operation, objectName, operationName, arguments, signatures);
                     PropertiesPreparator.prepare(data, properties);
                     data.put("host", host);
-                    Event event = new Event("decanter/collect/jmx/" + this.type + "/" + getTopic(objectName), data);
+                    Event event = new Event(topic + this.type + "/" + getTopic(objectName), data);
                     dispatcher.postEvent(event);
                 } else {
                     LOGGER.warn("{} is not well configured ({})", operation, raw);
diff --git a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
index 98d64de..5ca4ff2 100644
--- a/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
+++ b/collector/log/src/main/java/org/apache/karaf/decanter/collector/log/LogCollector.java
@@ -33,6 +33,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,8 +117,8 @@
         if (loggerName == null || loggerName.isEmpty()) {
             loggerName = "default";
         }
-        String topic = "decanter/collect/log/" + cleanLoggerName(loggerName);
-        this.dispatcher.postEvent(new Event(topic, data));
+        String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/log/";
+        this.dispatcher.postEvent(new Event(topic + cleanLoggerName(loggerName), data));
     }
     
     /*
diff --git a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
index 11a0486..ce79648 100644
--- a/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
+++ b/collector/log4j-socket/src/main/java/org/apache/karaf/decanter/collector/log/socket/SocketCollector.java
@@ -45,6 +45,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,12 +138,12 @@
             LOGGER.warn("Can't prepare data for the dispatcher", e);
         }
 
-        String topic = loggerName2Topic(loggingEvent.getLoggerName());
-        Event event = new Event(topic, data);
+        String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/log/";
+        Event event = new Event(loggerName2Topic(topic, loggingEvent.getLoggerName()), data);
         dispatcher.postEvent(event);
     }
 
-    static String loggerName2Topic(String loggerName) {
+    static String loggerName2Topic(String topic, String loggerName) {
         StringBuilder out = new StringBuilder();
         for (int c = 0; c < loggerName.length(); c++) {
             Character ch = loggerName.charAt(c);
@@ -156,7 +157,7 @@
         while (outSt.length() > 1 && outSt.endsWith("/")) {
             outSt = outSt.substring(0, outSt.length() - 1);
         }
-        return "decanter/collect/log/" + outSt.replace(".", "/");
+        return topic + outSt.replace(".", "/");
     }
 
     private void putLocation(Map<String, Object> data, LocationInfo loc) {
diff --git a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
index 89e1688..2e462c9 100644
--- a/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
+++ b/collector/log4j-socket/src/test/java/org/apache/karaf/decanter/collector/log/socket/SocketCollectorTest.java
@@ -73,7 +73,7 @@
 
     @Test
     public void testLoggerName2Topic() {
-        String topic = SocketCollector.loggerName2Topic("test.[Tomcat].[localhost].[/]");
+        String topic = SocketCollector.loggerName2Topic("decanter/collect/log/", "test.[Tomcat].[localhost].[/]");
         Assert.assertEquals("decanter/collect/log/test/Tomcat/localhost", topic);
     }
 
diff --git a/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java b/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
index e02a027..1148ba9 100644
--- a/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
+++ b/collector/openstack/src/main/java/org/apache/karaf/decanter/collector/openstack/OpenstackCollector.java
@@ -24,6 +24,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,11 +82,7 @@
 
     public void activate(Dictionary<String, Object> config) throws Exception {
         this.config = config;
-        if (config.get("topic") != null) {
-            topic = (String) config.get("topic");
-        } else {
-            topic = "decanter/collect/openstack";
-        }
+        topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/openstack";
         if (config.get("openstack.identity") == null) {
             throw new IllegalStateException("openstack.identity is not configured");
         }
diff --git a/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java b/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
index e725fe3..b555958 100644
--- a/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
+++ b/collector/oshi/src/main/java/org/apache/karaf/decanter/collector/oshi/OshiCollector.java
@@ -23,6 +23,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import oshi.SystemInfo;
@@ -368,7 +369,9 @@
 
             PropertiesPreparator.prepare(data, properties);
 
-            dispatcher.postEvent(new Event("decanter/collect/oshi", data));
+            String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/oshi";
+
+            dispatcher.postEvent(new Event(topic, data));
         } catch (Exception e) {
             LOGGER.warn("Can't get oshi system metrics", e);
         }
diff --git a/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java b/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
index e31b594..386c455 100644
--- a/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
+++ b/collector/prometheus/src/main/java/org/apache/karaf/decanter/collector/prometheus/PrometheusCollector.java
@@ -23,6 +23,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -90,7 +91,8 @@
                 }
             }
             PropertiesPreparator.prepare(data, properties);
-            dispatcher.postEvent(new Event("decanter/collect/prometheus", data));
+            String topic = (properties.get(EventConstants.EVENT_TOPIC) != null) ? (String) properties.get(EventConstants.EVENT_TOPIC) : "decanter/collect/prometheus";
+            dispatcher.postEvent(new Event(topic, data));
         } catch (Exception e) {
             LOGGER.warn("Can't get Prometheus metrics", e);
             e.printStackTrace();
diff --git a/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java b/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
index 8dff199..d604be3 100644
--- a/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
+++ b/collector/redis/src/main/java/org/apache/karaf/decanter/collector/redis/RedisCollector.java
@@ -24,6 +24,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.redisson.Redisson;
 import org.redisson.api.RMap;
 import org.redisson.api.RedissonClient;
@@ -102,7 +103,8 @@
         } catch (Exception e) {
             LOGGER.warn("Can't prepare data", e);
         }
-        dispatcher.postEvent(new Event("decanter/collect/redis", data));
+        String topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/redis";
+        dispatcher.postEvent(new Event(topic, data));
     }
 
 }
diff --git a/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java b/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
index d27d3c2..48908fc 100644
--- a/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
+++ b/collector/rest-servlet/src/main/java/org/apache/karaf/decanter/collector/rest/servlet/RestServletCollector.java
@@ -39,6 +39,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +62,7 @@
 
     private final static Logger LOGGER = LoggerFactory.getLogger(RestServletCollector.class);
 
-    private String baseTopic;
+    private String topic;
     private Dictionary<String, Object> properties;
     private long maxRequestSize = 100000;
 
@@ -69,7 +70,7 @@
     @Activate
     public void activate(ComponentContext context) throws MalformedURLException {
         Dictionary<String, Object> props = context.getProperties();
-        this.baseTopic = getProperty(props, "topic", "decanter/collect/rest-servlet");
+        this.topic = getProperty(props, EventConstants.EVENT_TOPIC, "decanter/collect/rest-servlet");
         this.properties = props;
         if (this.properties.get("max.request.size") != null) {
             maxRequestSize = Long.parseLong((String)this.properties.get("max.request.size"));
@@ -101,7 +102,7 @@
 
             PropertiesPreparator.prepare(data, properties);
 
-            Event event = new Event(baseTopic, data);
+            Event event = new Event(topic, data);
             dispatcher.postEvent(event);
             resp.setStatus(HttpServletResponse.SC_CREATED);
             LOGGER.debug("Karaf Decanter REST Servlet Collector harvesting done");
diff --git a/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java b/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
index 57e674d..4d534bf 100644
--- a/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
+++ b/collector/rest/src/main/java/org/apache/karaf/decanter/collector/rest/RestCollector.java
@@ -32,6 +32,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,7 +78,7 @@
         this.config = config;
         this.url = new URL(getProperty(config, "url", "http://localhost:8181"));
         this.paths = getProperty(config, "paths", "").split(",");
-        this.topic = getProperty(config, "topic", "decanter/collect/rest");
+        this.topic = getProperty(config, EventConstants.EVENT_TOPIC, "decanter/collect/rest");
         this.requestMethod = getProperty(config, "request.method", "GET");
         this.user = getProperty(config, "user", null);
         this.password = getProperty(config, "password", null);
diff --git a/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java b/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
index 6779680..04ad87d 100644
--- a/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
+++ b/collector/snmp/src/main/java/org/apache/karaf/decanter/collector/snmp/SnmpPoller.java
@@ -21,6 +21,7 @@
 import org.osgi.service.component.annotations.*;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.snmp4j.*;
@@ -238,7 +239,8 @@
             data.put(variableBinding.getOid().toString(), variableBinding.getVariable().toString());
         }
         // send event
-        dispatcher.postEvent(new Event("decanter/collector/snmp", data));
+        String topic = (configuration.get(EventConstants.EVENT_TOPIC) != null) ? (String) configuration.get(EventConstants.EVENT_TOPIC) : "decanter/collector/snmp";
+        dispatcher.postEvent(new Event(topic, data));
     }
 
     private OctetString convertToOctetString(String value) {
diff --git a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
index fb6d6e1..a7ebeff 100644
--- a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
+++ b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
@@ -34,6 +34,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,10 +71,7 @@
         if (config.get("url") == null) {
             throw new IllegalArgumentException("url property is mandatory");
         }
-        this.topic = "decanter/collect/soap";
-        if (config.get("topic") != null) {
-            this.topic = (String) config.get("topic");
-        }
+        topic = (config.get(EventConstants.EVENT_TOPIC) != null) ? (String) config.get(EventConstants.EVENT_TOPIC) : "decanter/collect/soap";
         url = new URL((String) config.get("url"));
         if (config.get("soap.request") == null) {
             throw new IllegalStateException("soap.request property is mandatory");
diff --git a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
index 996dfa5..cb1f210 100644
--- a/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
+++ b/collector/socket/src/main/java/org/apache/karaf/decanter/collector/socket/SocketCollector.java
@@ -57,7 +57,7 @@
     private boolean open;
     private ExecutorService executor;
     private Dictionary<String, Object> properties;
-    private String eventAdminTopic;
+    private String topic;
     private long maxRequestSize = 100000;
     
     @Reference
@@ -85,7 +85,7 @@
             this.protocol = Protocol.TCP;
         }
 
-        eventAdminTopic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket");
+        topic = getProperty(this.properties, EventConstants.EVENT_TOPIC, "decanter/collect/socket");
 
         switch (protocol) {
             case TCP:
@@ -181,7 +181,7 @@
                         data.put("type", "socket");
                         data.putAll(unmarshaller.unmarshal(new ByteArrayInputStream(line.getBytes())));
                         PropertiesPreparator.prepare(data, properties);
-                        Event event = new Event(eventAdminTopic, data);
+                        Event event = new Event(topic, data);
                         dispatcher.postEvent(event);
                     }
                 }
@@ -217,7 +217,7 @@
                     LOGGER.warn("Can't prepare data for the dispatcher", e);
                 }
 
-                Event event = new Event(eventAdminTopic, data);
+                Event event = new Event(topic, data);
                 dispatcher.postEvent(event);
                 datagramSocket.send(packet);
             } catch (EOFException e) {
diff --git a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
index 599cf49..d21ba40 100644
--- a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
+++ b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
@@ -39,6 +39,7 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +66,7 @@
     @Activate
     public void activate(ComponentContext context) {
         this.properties = context.getProperties();
-        this.topic = context.getProperties().get("topic") != null ? String.class.cast(context.getProperties().get("topic")) : "decanter/collect/system/";
+        this.topic = context.getProperties().get(EventConstants.EVENT_TOPIC) != null ? String.class.cast(context.getProperties().get(EventConstants.EVENT_TOPIC)) : "decanter/collect/system/";
         if (!this.topic.endsWith("/")) {
             this.topic = this.topic + "/";
         }
diff --git a/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java b/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
index 713ef91..38255a9 100644
--- a/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
+++ b/itest/src/test/java/org/apache/karaf/decanter/itests/collector/CamelCollectorTest.java
@@ -22,7 +22,6 @@
 import org.apache.camel.core.osgi.OsgiDataFormatResolver;
 import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
 import org.apache.camel.core.osgi.OsgiLanguageResolver;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.karaf.decanter.collector.camel.DecanterEventNotifier;
 import org.apache.karaf.decanter.collector.camel.DecanterInterceptStrategy;
 import org.apache.karaf.itests.KarafTestSupport;
@@ -136,7 +135,7 @@
         // create route with notifier
         EventAdmin eventAdmin = getOsgiService(EventAdmin.class);
         DecanterEventNotifier notifier = new DecanterEventNotifier();
-        notifier.setEventAdmin(eventAdmin);
+        notifier.setDispatcher(eventAdmin);
 
         RouteBuilder builder = new RouteBuilder() {
             @Override
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc b/manual/src/main/asciidoc/user-guide/collectors.adoc
index 4f45f64..0a986de 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -1242,6 +1242,18 @@
 
 You have to define the locations of the OpenStack APIs and if you enabled requesting the APIs or not.
 
+==== Target dispatcher topics
+
+All collectors use a default Decanter dispatcher topic name. However, you can change the topic name with the one of your choice.
+
+For that, you have to set `event.topics` property in the collector configuration.
+
+For instance:
+
+----
+event.topics=my/topic/name
+----
+
 ==== Customizing properties in collectors
 
 You can add, rename or remove properties collected by the collectors before sending it to the dispatcher.
diff --git a/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java b/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
index 78c8284..bb68296 100644
--- a/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
+++ b/processor/passthrough/src/main/java/org/apache/karaf/decanter/processor/passthrough/PassThroughProcessor.java
@@ -43,10 +43,7 @@
     @Activate
     public void activate(ComponentContext componentContext) {
         Dictionary<String, Object> properties = componentContext.getProperties();
-        String targetTopic = "decanter/process/passthrough";
-        if (properties.get("target.topics") != null) {
-            targetTopic = properties.get("target.topics").toString();
-        }
+        targetTopic = (properties.get("target.topics") != null) ? (String) properties.get("target.topics") : "decanter/process/passthrough";
         activate(targetTopic);
     }