UNOMI-102 : Add Camel config for export features
diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
index 45de3d6..7f45228 100644
--- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
+++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
@@ -32,13 +32,20 @@
     String IMPORT_EXPORT_CONFIG_TYPE_RECURRENT = "recurrent";
     String IMPORT_EXPORT_CONFIG_TYPE_ONESHOT = "oneshot";
 
-    String DIRECT_DEPOSIT_BUFFER = "direct:depositBuffer";
+    String DIRECT_IMPORT_DEPOSIT_BUFFER = "direct:depositImportBuffer";
+    String DIRECT_EXPORT_DEPOSIT_BUFFER = "direct:depositExportBuffer";
 
     String DIRECTION_FROM = "from";
     String DIRECTION_TO = "to";
 
     String HEADER_CONFIG_TYPE = "configType";
 
+    String HEADER_EXPORT_CONFIG = "exportConfig";
     String HEADER_FAILED_MESSAGE = "failedMessage";
     String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot";
+
+    String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE";
+    String DEFAULT_FILE_COLUMN_SEPARATOR = ",";
+
+    String DEFAULT_FILE_LINE_SEPARATOR = "\n";
 }
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
new file mode 100644
index 0000000..4525019
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.bean;
+
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.persistence.spi.PersistenceService;
+
+import java.util.List;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class CollectProfileBean {
+
+    private PersistenceService persistenceService;
+
+    public List<Profile> extractProfileBySegment(String segment) {
+        return persistenceService.query("segments", segment,null, Profile.class);
+    }
+
+    public void setPersistenceService(PersistenceService persistenceService) {
+        this.persistenceService = persistenceService;
+    }
+}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 32ceba8..d6ca24b 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -27,13 +27,11 @@
 import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.RouterConstants;
 import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
-import org.apache.unomi.router.core.route.ProfileExportCollectRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder;
+import org.apache.unomi.router.core.route.*;
 import org.osgi.framework.Bundle;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.BundleEvent;
@@ -53,7 +51,8 @@
     private Logger logger = LoggerFactory.getLogger(RouterCamelContext.class.getName());
     private CamelContext camelContext;
     private UnomiStorageProcessor unomiStorageProcessor;
-    private RouteCompletionProcessor routeCompletionProcessor;
+    private ImportRouteCompletionProcessor importRouteCompletionProcessor;
+    private ExportRouteCompletionProcessor exportRouteCompletionProcessor;
     private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
     private ImportExportConfigurationService<ImportConfiguration> importConfigurationService;
     private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService;
@@ -102,19 +101,26 @@
         //Unomi sink route
         ProfileImportToUnomiRouteBuilder builderProcessor = new ProfileImportToUnomiRouteBuilder(kafkaProps, configType);
         builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor);
-        builderProcessor.setRouteCompletionProcessor(routeCompletionProcessor);
+        builderProcessor.setImportRouteCompletionProcessor(importRouteCompletionProcessor);
         builderProcessor.setJacksonDataFormat(jacksonDataFormat);
         builderProcessor.setContext(camelContext);
         camelContext.addRoutes(builderProcessor);
 
         //--EXPORT ROUTES
-        ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder();
+        ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType);
         profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
         profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
         profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+        profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
         profileExportCollectRouteBuilder.setContext(camelContext);
         camelContext.addRoutes(profileExportCollectRouteBuilder);
 
+        ProfileExportProducerRouteBuilder profileExportProducerRouteBuilder = new ProfileExportProducerRouteBuilder(kafkaProps, configType);
+        profileExportProducerRouteBuilder.setExportRouteCompletionProcessor(exportRouteCompletionProcessor);
+        profileExportProducerRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+        profileExportProducerRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
+        profileExportProducerRouteBuilder.setContext(camelContext);
+        camelContext.addRoutes(profileExportProducerRouteBuilder);
 
         camelContext.start();
 
@@ -174,10 +180,11 @@
         killExistingRoute(exportConfiguration.getItemId());
         //Handle transforming an import config oneshot <--> recurrent
         if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())) {
-            ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder();
+            ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType);
             profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
             profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
             profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+            profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
             profileExportCollectRouteBuilder.setContext(camelContext);
             camelContext.addRoutes(profileExportCollectRouteBuilder);
         }
@@ -191,8 +198,12 @@
         this.unomiStorageProcessor = unomiStorageProcessor;
     }
 
-    public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) {
-        this.routeCompletionProcessor = routeCompletionProcessor;
+    public void setImportRouteCompletionProcessor(ImportRouteCompletionProcessor importRouteCompletionProcessor) {
+        this.importRouteCompletionProcessor = importRouteCompletionProcessor;
+    }
+
+    public void setExportRouteCompletionProcessor(ExportRouteCompletionProcessor exportRouteCompletionProcessor) {
+        this.exportRouteCompletionProcessor = exportRouteCompletionProcessor;
     }
 
     public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) {
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
index 8e6ab36..76bd8a6 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
@@ -19,7 +19,6 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.core.context.RouterCamelContext;
 
 /**
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
new file mode 100644
index 0000000..1b4d1da
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.unomi.router.api.ExportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by amidani on 29/06/2017.
+ */
+public class ExportRouteCompletionProcessor implements Processor {
+
+    private static final Logger logger = LoggerFactory.getLogger(ExportRouteCompletionProcessor.class.getName());
+    private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService;
+    private int executionsHistorySize;
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        String importConfigId = null;
+        ExportConfiguration exportConfig = (ExportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_EXPORT_CONFIG);
+
+        Map execution = new HashMap();
+        execution.put("date", ((Date) exchange.getProperty("CamelCreatedTimestamp")).getTime());
+        execution.put("extractedProfiles", exchange.getProperty("CamelSplitSize"));
+
+        ExportConfiguration exportConfiguration = exportConfigurationService.load(exportConfig.getItemId());
+
+        if (exportConfiguration.getExecutions().size() >= executionsHistorySize) {
+            int oldestExecIndex = 0;
+            long oldestExecDate = (Long) exportConfiguration.getExecutions().get(0).get("date");
+            for (int i = 1; i < exportConfiguration.getExecutions().size(); i++) {
+                if ((Long) exportConfiguration.getExecutions().get(i).get("date") < oldestExecDate) {
+                    oldestExecDate = (Long) exportConfiguration.getExecutions().get(i).get("date");
+                    oldestExecIndex = i;
+                }
+            }
+            exportConfiguration.getExecutions().remove(oldestExecIndex);
+        }
+
+        exportConfiguration.getExecutions().add(execution);
+        exportConfigurationService.save(exportConfiguration);
+
+        logger.info("Processing route {} completed.", exchange.getFromRouteId());
+    }
+
+    public void setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration> exportConfigurationService) {
+        this.exportConfigurationService = exportConfigurationService;
+    }
+
+    public void setExecutionsHistorySize(int executionsHistorySize) {
+        this.executionsHistorySize = executionsHistorySize;
+    }
+}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
similarity index 97%
rename from extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
rename to extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
index b522426..edb7391 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
@@ -21,8 +21,8 @@
 import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.api.ImportLineError;
 import org.apache.unomi.router.api.ProfileToImport;
-import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,9 +31,9 @@
 /**
  * Created by amidani on 14/06/2017.
  */
-public class RouteCompletionProcessor implements Processor {
+public class ImportRouteCompletionProcessor implements Processor {
 
-    private static final Logger logger = LoggerFactory.getLogger(RouteCompletionProcessor.class.getName());
+    private static final Logger logger = LoggerFactory.getLogger(ImportRouteCompletionProcessor.class.getName());
     private ImportExportConfigurationService<ImportConfiguration> importConfigurationService;
     private int executionsHistorySize;
 
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
new file mode 100644
index 0000000..6f83741
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.router.api.ExportConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class LineBuildProcessor implements Processor {
+
+    private static final Logger logger = LoggerFactory.getLogger(LineBuildProcessor.class);
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        ExportConfiguration exportConfiguration = (ExportConfiguration) exchange.getIn().getHeader("exportConfig");
+        exchange.getIn().setHeader("destination", exportConfiguration.getProperty("destination"));
+        Profile profile = exchange.getIn().getBody(Profile.class);
+
+        Map<String, String> mapping = (Map<String, String>) exportConfiguration.getProperty("mapping");
+        String lineToWrite = "";
+        for (int i = 0; i < mapping.size(); i++) {
+            String propertyName = mapping.get(String.valueOf(i));
+            lineToWrite += profile.getProperty(propertyName) != null ? profile.getProperty(propertyName) : "";
+            if (i + 1 < mapping.size()) {
+                lineToWrite += exportConfiguration.getColumnSeparator();
+            }
+        }
+
+        exchange.getIn().setBody(lineToWrite, String.class);
+    }
+
+}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
index dd70033..885713a 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
@@ -18,7 +18,6 @@
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.model.rest.RestBindingMode;
-import org.apache.unomi.api.services.ConfigSharingService;
 import org.apache.unomi.router.api.ExportConfiguration;
 import org.apache.unomi.router.api.ImportConfiguration;
 import org.apache.unomi.router.core.context.RouterCamelContext;
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
index 5c3015e..b67859a 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
@@ -16,21 +16,25 @@
  */
 package org.apache.unomi.router.core.route;
 
-import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.ProcessorDefinition;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.api.Profile;
 import org.apache.unomi.persistence.spi.PersistenceService;
 import org.apache.unomi.router.api.ExportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
 import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.apache.unomi.router.core.bean.CollectProfileBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Created by amidani on 27/06/2017.
  */
-public class ProfileExportCollectRouteBuilder extends RouteBuilder {
+public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder {
 
     private static final Logger logger = LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class);
 
@@ -38,7 +42,9 @@
     private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService;
     private PersistenceService persistenceService;
 
-    private String allowedEndpoints;
+    public ProfileExportCollectRouteBuilder(Map<String, String> kafkaProps, String configType) {
+        super(kafkaProps, configType);
+    }
 
     @Override
     public void configure() throws Exception {
@@ -48,16 +54,37 @@
             exportConfigurationList = exportConfigurationService.getAll();
         }
 
+        CollectProfileBean collectProfileBean = new CollectProfileBean();
+        collectProfileBean.setPersistenceService(persistenceService);
+
+
         //Loop on multiple export configuration
         for (final ExportConfiguration exportConfiguration : exportConfigurationList) {
-            String endpoint = (String) exportConfiguration.getProperties().get("destination");
-
-            if (StringUtils.isNotBlank(endpoint) && allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) {
-                List<Profile> profilesCollected = persistenceService.query("segments", (String) exportConfiguration.getProperties().get("segments"),
-                         null, Profile.class);
-                logger.info("Collected +++{}+++ profiles.", profilesCollected.size());
+            if (exportConfiguration.getProperties() != null && exportConfiguration.getProperties().size() > 0) {
+                if ((Map<String, String>) exportConfiguration.getProperties().get("mapping") != null) {
+                    String destinationEndpoint = (String) exportConfiguration.getProperties().get("destination");
+                    if (StringUtils.isNotBlank(destinationEndpoint) && allowedEndpoints.contains(destinationEndpoint.substring(0, destinationEndpoint.indexOf(':')))) {
+                        ProcessorDefinition prDef = from("timer://collectProfile?fixedRate=true&period=" + (String) exportConfiguration.getProperties().get("period"))
+                                .autoStartup(exportConfiguration.isActive())
+                                .bean(collectProfileBean, "extractProfileBySegment(" + exportConfiguration.getProperties().get("segment") + ")")
+                                .split(body())
+                                .marshal(jacksonDataFormat)
+                                .convertBodyTo(String.class)
+                                .setHeader(RouterConstants.HEADER_EXPORT_CONFIG, constant(exportConfiguration))
+                                .log(LoggingLevel.DEBUG, "BODY : ${body}");
+                        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+                            prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+                        } else {
+                            prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+                        }
+                    } else {
+                        logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", destinationEndpoint.substring(0, destinationEndpoint.indexOf(':')), exportConfiguration.getItemId());
+                    }
+                } else {
+                    logger.warn("Mapping is null in export configuration, route {} will be skipped!", exportConfiguration.getItemId());
+                }
             } else {
-                logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), exportConfiguration.getItemId());
+                logger.warn("Export configuration incomplete, route {} will be skipped!", exportConfiguration.getItemId());
             }
         }
     }
@@ -74,8 +101,4 @@
         this.persistenceService = persistenceService;
     }
 
-    public void setAllowedEndpoints(String allowedEndpoints) {
-        this.allowedEndpoints = allowedEndpoints;
-    }
-
 }
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
new file mode 100644
index 0000000..0b0b60a
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.LineBuildProcessor;
+import org.apache.unomi.router.core.strategy.StringLinesAggregationStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class ProfileExportProducerRouteBuilder extends RouterAbstractRouteBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProfileExportProducerRouteBuilder.class);
+
+    private ExportRouteCompletionProcessor exportRouteCompletionProcessor;
+
+    public ProfileExportProducerRouteBuilder(Map<String, String> kafkaProps, String configType) {
+        super(kafkaProps, configType);
+    }
+
+    @Override
+    public void configure() throws Exception {
+
+        logger.info("Configure Recurrent Route 'Export :: Data Producer'");
+
+        RouteDefinition rtDef;
+        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+        } else {
+            rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+        }
+
+        rtDef.unmarshal(jacksonDataFormat)
+                .process(new LineBuildProcessor())
+                .aggregate(constant(true), new StringLinesAggregationStrategy())
+                .completionPredicate(exchangeProperty("CamelSplitSize").isEqualTo(exchangeProperty("CamelAggregatedSize")))
+                .eagerCheckCompletion()
+                .process(exportRouteCompletionProcessor)
+                .toD("${in.header.exportConfig.getProperty('destination')}");
+
+    }
+
+    public void setExportRouteCompletionProcessor(ExportRouteCompletionProcessor exportRouteCompletionProcessor) {
+        this.exportRouteCompletionProcessor = exportRouteCompletionProcessor;
+    }
+
+}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
index 7f54884..2dc87f3 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
@@ -23,8 +23,8 @@
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.unomi.router.api.ImportConfiguration;
-import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
 import org.apache.unomi.router.core.exception.BadProfileDataFormatException;
 import org.apache.unomi.router.core.processor.LineSplitFailureHandler;
 import org.apache.unomi.router.core.processor.LineSplitProcessor;
@@ -39,15 +39,13 @@
  * Created by amidani on 26/04/2017.
  */
 
-public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRouteBuilder {
+public class ProfileImportFromSourceRouteBuilder extends RouterAbstractRouteBuilder {
 
     private static final Logger logger = LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName());
 
     private List<ImportConfiguration> importConfigurationList;
     private ImportExportConfigurationService<ImportConfiguration> importConfigurationService;
 
-    private String allowedEndpoints;
-
     public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, String configType) {
         super(kafkaProps, configType);
     }
@@ -67,9 +65,9 @@
                 .process(new LineSplitFailureHandler());
 
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-            prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         } else {
-            prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         }
 
         //Loop on multiple import configuration
@@ -113,9 +111,9 @@
                             .convertBodyTo(String.class);
 
                     if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-                        prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM));
+                        prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
                     } else {
-                        prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+                        prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
                     }
                 } else {
                     logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), importConfiguration.getItemId());
@@ -132,8 +130,4 @@
         this.importConfigurationService = importConfigurationService;
     }
 
-    public void setAllowedEndpoints(String allowedEndpoints) {
-        this.allowedEndpoints = allowedEndpoints;
-    }
-
 }
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
index a94b5ed..0913876 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
@@ -17,7 +17,6 @@
 package org.apache.unomi.router.core.route;
 
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.jackson.JacksonDataFormat;
 import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.unomi.router.api.RouterConstants;
@@ -33,15 +32,12 @@
 /**
  * Created by amidani on 22/05/2017.
  */
-public class ProfileImportOneShotRouteBuilder extends ProfileImportAbstractRouteBuilder {
+public class ProfileImportOneShotRouteBuilder extends RouterAbstractRouteBuilder {
 
     private Logger logger = LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName());
-
     private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
     private String uploadDir;
 
-    private final String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE";
-
     public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps, String configType) {
         super(kafkaProps, configType);
     }
@@ -57,27 +53,27 @@
                 .process(new LineSplitFailureHandler());
 
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-            prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         } else {
-            prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         }
 
         LineSplitProcessor lineSplitProcessor = new LineSplitProcessor();
 
-        ProcessorDefinition prDef = from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m")
-                .routeId(IMPORT_ONESHOT_ROUTE_ID)
+        ProcessorDefinition prDef = from("file://" + uploadDir + "?include=.*.csv&consumer.delay=1m")
+                .routeId(RouterConstants.IMPORT_ONESHOT_ROUTE_ID)
                 .autoStartup(true)
                 .process(importConfigByFileNameProcessor)
                 .split(bodyAs(String.class).tokenize("${in.header.importConfigOneShot.getLineSeparator}"))
-                .setHeader("configType", constant(configType))
+                .setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType))
                 .process(lineSplitProcessor)
                 .to("log:org.apache.unomi.router?level=INFO")
                 .marshal(jacksonDataFormat)
                 .convertBodyTo(String.class);
-        if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){
-            prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM));
+        if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         } else {
-            prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+            prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         }
     }
 
@@ -88,8 +84,4 @@
     public void setUploadDir(String uploadDir) {
         this.uploadDir = uploadDir;
     }
-
-    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
-        this.jacksonDataFormat = jacksonDataFormat;
-    }
 }
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
index d75977b..759dde4 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
@@ -17,11 +17,10 @@
 package org.apache.unomi.router.core.route;
 
 import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.jackson.JacksonDataFormat;
 import org.apache.camel.component.kafka.KafkaEndpoint;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.unomi.router.api.RouterConstants;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
 import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
 import org.apache.unomi.router.core.strategy.ArrayListAggregationStrategy;
 import org.slf4j.Logger;
@@ -32,12 +31,12 @@
 /**
  * Created by amidani on 26/04/2017.
  */
-public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRouteBuilder {
+public class ProfileImportToUnomiRouteBuilder extends RouterAbstractRouteBuilder {
 
     private Logger logger = LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName());
 
     private UnomiStorageProcessor unomiStorageProcessor;
-    private RouteCompletionProcessor routeCompletionProcessor;
+    private ImportRouteCompletionProcessor importRouteCompletionProcessor;
 
     public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, String configType) {
         super(kafkaProps, configType);
@@ -50,9 +49,9 @@
 
         RouteDefinition rtDef;
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
-            rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO));
+            rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         } else {
-            rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO));
+            rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
         }
         rtDef.choice()
                 .when(header(RouterConstants.HEADER_FAILED_MESSAGE).isNull())
@@ -64,7 +63,7 @@
                 .aggregate(constant(true), new ArrayListAggregationStrategy())
                 .completionPredicate(exchangeProperty("CamelSplitComplete").isEqualTo("true"))
                 .eagerCheckCompletion()
-                .process(routeCompletionProcessor)
+                .process(importRouteCompletionProcessor)
                 .to("log:org.apache.unomi.router?level=INFO");
     }
 
@@ -72,11 +71,7 @@
         this.unomiStorageProcessor = unomiStorageProcessor;
     }
 
-    public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) {
-        this.routeCompletionProcessor = routeCompletionProcessor;
-    }
-
-    public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
-        this.jacksonDataFormat = jacksonDataFormat;
+    public void setImportRouteCompletionProcessor(ImportRouteCompletionProcessor importRouteCompletionProcessor) {
+        this.importRouteCompletionProcessor = importRouteCompletionProcessor;
     }
 }
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
similarity index 63%
rename from extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
rename to extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
index bacc38e..5db9917 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
@@ -29,49 +29,60 @@
 /**
  * Created by amidani on 13/06/2017.
  */
-public abstract class ProfileImportAbstractRouteBuilder extends RouteBuilder {
+public abstract class RouterAbstractRouteBuilder extends RouteBuilder {
 
     protected JacksonDataFormat jacksonDataFormat;
 
     protected String kafkaHost;
     protected String kafkaPort;
     protected String kafkaImportTopic;
+    protected String kafkaExportTopic;
     protected String kafkaImportGroupId;
-    protected String kafkaImportConsumerCount;
-    protected String kafkaImportAutoCommit;
+    protected String kafkaExportGroupId;
+    protected String kafkaConsumerCount;
+    protected String kafkaAutoCommit;
 
     protected String configType;
+    protected String allowedEndpoints;
 
-    public ProfileImportAbstractRouteBuilder(Map<String, String> kafkaProps, String configType) {
+    public RouterAbstractRouteBuilder(Map<String, String> kafkaProps, String configType) {
         this.kafkaHost = kafkaProps.get("kafkaHost");
         this.kafkaPort = kafkaProps.get("kafkaPort");
         this.kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
+        this.kafkaExportTopic = kafkaProps.get("kafkaExportTopic");
         this.kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
-        this.kafkaImportConsumerCount = kafkaProps.get("kafkaImportConsumerCount");
-        this.kafkaImportAutoCommit = kafkaProps.get("kafkaImportAutoCommit");
+        this.kafkaExportGroupId = kafkaProps.get("kafkaExportGroupId");
+        this.kafkaConsumerCount = kafkaProps.get("kafkaConsumerCount");
+        this.kafkaAutoCommit = kafkaProps.get("kafkaAutoCommit");
         this.configType = configType;
     }
 
-    public Object getEndpointURI(String direction) {
+    public Object getEndpointURI(String direction, String operationDepositBuffer) {
         Object endpoint;
         if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+            String kafkaTopic = kafkaImportTopic;
+            String kafkaGroupId = kafkaImportGroupId;
+            if (RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER.equals(operationDepositBuffer)) {
+                kafkaTopic = kafkaExportTopic;
+                kafkaGroupId = kafkaExportGroupId;
+            }
             //Prepare Kafka Deposit
             StringBuilder kafkaUri = new StringBuilder("kafka:");
-            kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
-            if (StringUtils.isNotBlank(kafkaImportGroupId)) {
-                kafkaUri.append("&groupId=" + kafkaImportGroupId);
+            kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaTopic);
+            if (StringUtils.isNotBlank(kafkaGroupId)) {
+                kafkaUri.append("&groupId=" + kafkaGroupId);
             }
             if (RouterConstants.DIRECTION_TO.equals(direction)) {
-                kafkaUri.append("&autoCommitEnable=" + kafkaImportAutoCommit + "&consumersCount=" + kafkaImportConsumerCount);
+                kafkaUri.append("&autoCommitEnable=" + kafkaAutoCommit + "&consumersCount=" + kafkaConsumerCount);
             }
             KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
             kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort);
-            kafkaConfiguration.setTopic(kafkaImportTopic);
-            kafkaConfiguration.setGroupId(kafkaImportGroupId);
+            kafkaConfiguration.setTopic(kafkaTopic);
+            kafkaConfiguration.setGroupId(kafkaGroupId);
             endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext()));
             ((KafkaEndpoint) endpoint).setConfiguration(kafkaConfiguration);
         } else {
-            endpoint = RouterConstants.DIRECT_DEPOSIT_BUFFER;
+            endpoint = operationDepositBuffer;
         }
 
         return endpoint;
@@ -80,4 +91,8 @@
     public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
         this.jacksonDataFormat = jacksonDataFormat;
     }
+
+    public void setAllowedEndpoints(String allowedEndpoints) {
+        this.allowedEndpoints = allowedEndpoints;
+    }
 }
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
index a53e34b..ca87ad3 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
@@ -18,8 +18,6 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
 
 import java.util.ArrayList;
 
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
new file mode 100644
index 0000000..5a69001
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.strategy;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.unomi.router.api.ExportConfiguration;
+
+/**
+ * Created by amidani on 29/06/2017.
+ */
+public class StringLinesAggregationStrategy implements AggregationStrategy {
+
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        Object newBody = newExchange.getIn().getBody(String.class);
+        String lineSeparator = newExchange.getIn().getHeader("exportConfig", ExportConfiguration.class).getLineSeparator();
+        if (oldExchange != null) {
+            String fileContent = oldExchange.getIn().getBody(String.class);
+
+            fileContent += lineSeparator + newBody;
+            oldExchange.getIn().setBody(fileContent);
+            return oldExchange;
+        } else {
+            return newExchange;
+        }
+    }
+}
diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5ae1e9c..3b155b2 100644
--- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -27,15 +27,16 @@
         <cm:default-properties>
             <cm:property name="config.type" value="nobroker"/>
             <cm:property name="config.allowedEndpoints" value="file,ftp"/>
-            <cm:property name="config.internalPort" value="8233"/>
             <cm:property name="kafka.host" value="localhost"/>
             <cm:property name="kafka.port" value="9092"/>
-            <cm:property name="kafka.import.topic" value="camel-deposit"/>
+            <cm:property name="kafka.import.topic" value="import-deposit"/>
+            <cm:property name="kafka.export.topic" value="export-deposit"/>
             <cm:property name="kafka.import.groupId" value="unomi-import-group"/>
-            <cm:property name="kafka.import.consumerCount" value="10"/>
-            <cm:property name="kafka.import.autoCommit" value="true"/>
+            <cm:property name="kafka.export.groupId" value="unomi-export-group"/>
+            <cm:property name="kafka.consumerCount" value="10"/>
+            <cm:property name="kafka.autoCommit" value="true"/>
             <cm:property name="import.oneshot.uploadDir" value="/tmp/oneshot_import_configs/"/>
-            <cm:property name="import.executionsHistory.size" value="5"/>
+            <cm:property name="executionsHistory.size" value="5"/>
         </cm:default-properties>
     </cm:property-placeholder>
 
@@ -43,9 +44,14 @@
         <property name="profileImportService" ref="profileImportService"/>
     </bean>
 
-    <bean id="routeCompletionProcessor" class="org.apache.unomi.router.core.processor.RouteCompletionProcessor">
+    <bean id="importRouteCompletionProcessor" class="org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor">
         <property name="importConfigurationService" ref="importConfigurationService"/>
-        <property name="executionsHistorySize" value="${import.executionsHistory.size}"/>
+        <property name="executionsHistorySize" value="${executionsHistory.size}"/>
+    </bean>
+
+    <bean id="exportRouteCompletionProcessor" class="org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor">
+        <property name="exportConfigurationService" ref="exportConfigurationService"/>
+        <property name="executionsHistorySize" value="${executionsHistory.size}"/>
     </bean>
 
     <bean id="importConfigByFileNameProcessor" class="org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor">
@@ -86,26 +92,29 @@
           init-method="initCamelContext" destroy-method="preDestroy">
         <property name="configType" value="${config.type}"/>
         <property name="allowedEndpoints" value="${config.allowedEndpoints}"/>
+        <property name="uploadDir" value="${import.oneshot.uploadDir}"/>
+        <property name="bundleContext" ref="blueprintBundleContext"/>
+        <property name="jacksonDataFormat" ref="jacksonDataFormat"/>
         <property name="kafkaProps">
             <map>
                 <entry key="kafkaHost" value="${kafka.host}"/>
                 <entry key="kafkaPort" value="${kafka.port}"/>
                 <entry key="kafkaImportTopic" value="${kafka.import.topic}"/>
+                <entry key="kafkaExportTopic" value="${kafka.export.topic}"/>
                 <entry key="kafkaImportGroupId" value="${kafka.import.groupId}"/>
-                <entry key="kafkaImportConsumerCount" value="${kafka.import.consumerCount}"/>
-                <entry key="kafkaImportAutoCommit" value="${kafka.import.autoCommit}"/>
+                <entry key="kafkaExportGroupId" value="${kafka.export.groupId}"/>
+                <entry key="kafkaConsumerCount" value="${kafka.consumerCount}"/>
+                <entry key="kafkaAutoCommit" value="${kafka.autoCommit}"/>
             </map>
         </property>
-        <property name="uploadDir" value="${import.oneshot.uploadDir}"/>
         <property name="unomiStorageProcessor" ref="unomiStorageProcessor"/>
-        <property name="routeCompletionProcessor" ref="routeCompletionProcessor"/>
+        <property name="importRouteCompletionProcessor" ref="importRouteCompletionProcessor"/>
+        <property name="exportRouteCompletionProcessor" ref="exportRouteCompletionProcessor"/>
         <property name="importConfigByFileNameProcessor" ref="importConfigByFileNameProcessor"/>
-        <property name="importConfigurationService" ref="importConfigurationService"/>
-        <property name="exportConfigurationService" ref="exportConfigurationService"/>
-        <property name="persistenceService" ref="persistenceService"/>
-        <property name="jacksonDataFormat" ref="jacksonDataFormat"/>
-        <property name="bundleContext" ref="blueprintBundleContext"/>
         <property name="configSharingService" ref="configSharingService" />
+        <property name="exportConfigurationService" ref="exportConfigurationService"/>
+        <property name="importConfigurationService" ref="importConfigurationService"/>
+        <property name="persistenceService" ref="persistenceService"/>
     </bean>
 
     <camel:camelContext id="httpEndpoint" xmlns="http://camel.apache.org/schema/blueprint">
@@ -116,6 +125,10 @@
         <property name="routerCamelContext" ref="camelContext"/>
     </bean>
 
+    <bean id="collectProfileBean" class="org.apache.unomi.router.core.bean.CollectProfileBean">
+        <property name="persistenceService" ref="persistenceService"/>
+    </bean>
+
     <reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService" />
     <reference id="httpService" interface="org.osgi.service.http.HttpService"/>
     <reference id="profileImportService" interface="org.apache.unomi.router.api.services.ProfileImportService"/>
diff --git a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
index 2aa385f..8f29d65 100644
--- a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
+++ b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
@@ -23,19 +23,18 @@
 #Kafka
 #kafka.host=localhost
 #kafka.port=9092
-#kafka.import.topic=camel-deposit
+#kafka.import.topic=import-deposit
+#kafka.export.topic=export-deposit
 #kafka.import.groupId=unomi-import-group
-#kafka.import.consumerCount=10
-#kafka.import.autoCommit=true
+#kafka.export.groupId=unomi-import-group
+#kafka.consumerCount=10
+#kafka.autoCommit=true
 
 #Import One Shot upload directory
 import.oneshot.uploadDir=${karaf.data}/tmp/unomi_oneshot_import_configs/
 
-#Import executions history size
-import.executionsHistory.size=5
+#Import/Export executions history size
+executionsHistory.size=5
 
 #Allowed source endpoints
-config.allowedEndpoints=file,ftp
-
-#Internal Camel REST services port (Not public - DO NOT OPEN TO PUBLIC)
-config.internalPort=8233
\ No newline at end of file
+config.allowedEndpoints=file,ftp
\ No newline at end of file