UNOMI-102 : Add Camel config for export features
diff --git a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
index 45de3d6..7f45228 100644
--- a/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
+++ b/extensions/router/router-api/src/main/java/org/apache/unomi/router/api/RouterConstants.java
@@ -32,13 +32,20 @@
String IMPORT_EXPORT_CONFIG_TYPE_RECURRENT = "recurrent";
String IMPORT_EXPORT_CONFIG_TYPE_ONESHOT = "oneshot";
- String DIRECT_DEPOSIT_BUFFER = "direct:depositBuffer";
+ String DIRECT_IMPORT_DEPOSIT_BUFFER = "direct:depositImportBuffer";
+ String DIRECT_EXPORT_DEPOSIT_BUFFER = "direct:depositExportBuffer";
String DIRECTION_FROM = "from";
String DIRECTION_TO = "to";
String HEADER_CONFIG_TYPE = "configType";
+ String HEADER_EXPORT_CONFIG = "exportConfig";
String HEADER_FAILED_MESSAGE = "failedMessage";
String HEADER_IMPORT_CONFIG_ONESHOT = "importConfigOneShot";
+
+ String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE";
+ String DEFAULT_FILE_COLUMN_SEPARATOR = ",";
+
+ String DEFAULT_FILE_LINE_SEPARATOR = "\n";
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
new file mode 100644
index 0000000..4525019
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/bean/CollectProfileBean.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.bean;
+
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.persistence.spi.PersistenceService;
+
+import java.util.List;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class CollectProfileBean {
+
+ private PersistenceService persistenceService;
+
+ public List<Profile> extractProfileBySegment(String segment) {
+ return persistenceService.query("segments", segment,null, Profile.class);
+ }
+
+ public void setPersistenceService(PersistenceService persistenceService) {
+ this.persistenceService = persistenceService;
+ }
+}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
index 32ceba8..d6ca24b 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/context/RouterCamelContext.java
@@ -27,13 +27,11 @@
import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
import org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
-import org.apache.unomi.router.core.route.ProfileExportCollectRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportFromSourceRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportOneShotRouteBuilder;
-import org.apache.unomi.router.core.route.ProfileImportToUnomiRouteBuilder;
+import org.apache.unomi.router.core.route.*;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
@@ -53,7 +51,8 @@
private Logger logger = LoggerFactory.getLogger(RouterCamelContext.class.getName());
private CamelContext camelContext;
private UnomiStorageProcessor unomiStorageProcessor;
- private RouteCompletionProcessor routeCompletionProcessor;
+ private ImportRouteCompletionProcessor importRouteCompletionProcessor;
+ private ExportRouteCompletionProcessor exportRouteCompletionProcessor;
private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
private ImportExportConfigurationService<ImportConfiguration> importConfigurationService;
private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService;
@@ -102,19 +101,26 @@
//Unomi sink route
ProfileImportToUnomiRouteBuilder builderProcessor = new ProfileImportToUnomiRouteBuilder(kafkaProps, configType);
builderProcessor.setUnomiStorageProcessor(unomiStorageProcessor);
- builderProcessor.setRouteCompletionProcessor(routeCompletionProcessor);
+ builderProcessor.setImportRouteCompletionProcessor(importRouteCompletionProcessor);
builderProcessor.setJacksonDataFormat(jacksonDataFormat);
builderProcessor.setContext(camelContext);
camelContext.addRoutes(builderProcessor);
//--EXPORT ROUTES
- ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder();
+ ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType);
profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+ profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
profileExportCollectRouteBuilder.setContext(camelContext);
camelContext.addRoutes(profileExportCollectRouteBuilder);
+ ProfileExportProducerRouteBuilder profileExportProducerRouteBuilder = new ProfileExportProducerRouteBuilder(kafkaProps, configType);
+ profileExportProducerRouteBuilder.setExportRouteCompletionProcessor(exportRouteCompletionProcessor);
+ profileExportProducerRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+ profileExportProducerRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
+ profileExportProducerRouteBuilder.setContext(camelContext);
+ camelContext.addRoutes(profileExportProducerRouteBuilder);
camelContext.start();
@@ -174,10 +180,11 @@
killExistingRoute(exportConfiguration.getItemId());
//Handle transforming an import config oneshot <--> recurrent
if (RouterConstants.IMPORT_EXPORT_CONFIG_TYPE_RECURRENT.equals(exportConfiguration.getConfigType())) {
- ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder();
+ ProfileExportCollectRouteBuilder profileExportCollectRouteBuilder = new ProfileExportCollectRouteBuilder(kafkaProps, configType);
profileExportCollectRouteBuilder.setExportConfigurationService(exportConfigurationService);
profileExportCollectRouteBuilder.setPersistenceService(persistenceService);
profileExportCollectRouteBuilder.setAllowedEndpoints(allowedEndpoints);
+ profileExportCollectRouteBuilder.setJacksonDataFormat(jacksonDataFormat);
profileExportCollectRouteBuilder.setContext(camelContext);
camelContext.addRoutes(profileExportCollectRouteBuilder);
}
@@ -191,8 +198,12 @@
this.unomiStorageProcessor = unomiStorageProcessor;
}
- public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) {
- this.routeCompletionProcessor = routeCompletionProcessor;
+ public void setImportRouteCompletionProcessor(ImportRouteCompletionProcessor importRouteCompletionProcessor) {
+ this.importRouteCompletionProcessor = importRouteCompletionProcessor;
+ }
+
+ public void setExportRouteCompletionProcessor(ExportRouteCompletionProcessor exportRouteCompletionProcessor) {
+ this.exportRouteCompletionProcessor = exportRouteCompletionProcessor;
}
public void setImportConfigByFileNameProcessor(ImportConfigByFileNameProcessor importConfigByFileNameProcessor) {
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
index 8e6ab36..76bd8a6 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ConfigUpdateProcessor.java
@@ -19,7 +19,6 @@
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.core.context.RouterCamelContext;
/**
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
new file mode 100644
index 0000000..1b4d1da
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ExportRouteCompletionProcessor.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.unomi.router.api.ExportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by amidani on 29/06/2017.
+ */
+public class ExportRouteCompletionProcessor implements Processor {
+
+ private static final Logger logger = LoggerFactory.getLogger(ExportRouteCompletionProcessor.class.getName());
+ private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService;
+ private int executionsHistorySize;
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ String importConfigId = null;
+ ExportConfiguration exportConfig = (ExportConfiguration) exchange.getIn().getHeader(RouterConstants.HEADER_EXPORT_CONFIG);
+
+ Map execution = new HashMap();
+ execution.put("date", ((Date) exchange.getProperty("CamelCreatedTimestamp")).getTime());
+ execution.put("extractedProfiles", exchange.getProperty("CamelSplitSize"));
+
+ ExportConfiguration exportConfiguration = exportConfigurationService.load(exportConfig.getItemId());
+
+ if (exportConfiguration.getExecutions().size() >= executionsHistorySize) {
+ int oldestExecIndex = 0;
+ long oldestExecDate = (Long) exportConfiguration.getExecutions().get(0).get("date");
+ for (int i = 1; i < exportConfiguration.getExecutions().size(); i++) {
+ if ((Long) exportConfiguration.getExecutions().get(i).get("date") < oldestExecDate) {
+ oldestExecDate = (Long) exportConfiguration.getExecutions().get(i).get("date");
+ oldestExecIndex = i;
+ }
+ }
+ exportConfiguration.getExecutions().remove(oldestExecIndex);
+ }
+
+ exportConfiguration.getExecutions().add(execution);
+ exportConfigurationService.save(exportConfiguration);
+
+ logger.info("Processing route {} completed.", exchange.getFromRouteId());
+ }
+
+ public void setExportConfigurationService(ImportExportConfigurationService<ExportConfiguration> exportConfigurationService) {
+ this.exportConfigurationService = exportConfigurationService;
+ }
+
+ public void setExecutionsHistorySize(int executionsHistorySize) {
+ this.executionsHistorySize = executionsHistorySize;
+ }
+}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
similarity index 97%
rename from extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
rename to extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
index b522426..edb7391 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/RouteCompletionProcessor.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/ImportRouteCompletionProcessor.java
@@ -21,8 +21,8 @@
import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.api.ImportLineError;
import org.apache.unomi.router.api.ProfileToImport;
-import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,9 +31,9 @@
/**
* Created by amidani on 14/06/2017.
*/
-public class RouteCompletionProcessor implements Processor {
+public class ImportRouteCompletionProcessor implements Processor {
- private static final Logger logger = LoggerFactory.getLogger(RouteCompletionProcessor.class.getName());
+ private static final Logger logger = LoggerFactory.getLogger(ImportRouteCompletionProcessor.class.getName());
private ImportExportConfigurationService<ImportConfiguration> importConfigurationService;
private int executionsHistorySize;
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
new file mode 100644
index 0000000..6f83741
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/processor/LineBuildProcessor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.router.api.ExportConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class LineBuildProcessor implements Processor {
+
+ private static final Logger logger = LoggerFactory.getLogger(LineBuildProcessor.class);
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ ExportConfiguration exportConfiguration = (ExportConfiguration) exchange.getIn().getHeader("exportConfig");
+ exchange.getIn().setHeader("destination", exportConfiguration.getProperty("destination"));
+ Profile profile = exchange.getIn().getBody(Profile.class);
+
+ Map<String, String> mapping = (Map<String, String>) exportConfiguration.getProperty("mapping");
+ String lineToWrite = "";
+ for (int i = 0; i < mapping.size(); i++) {
+ String propertyName = mapping.get(String.valueOf(i));
+ lineToWrite += profile.getProperty(propertyName) != null ? profile.getProperty(propertyName) : "";
+ if (i + 1 < mapping.size()) {
+ lineToWrite += exportConfiguration.getColumnSeparator();
+ }
+ }
+
+ exchange.getIn().setBody(lineToWrite, String.class);
+ }
+
+}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
index dd70033..885713a 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ConfigUpdateRouteBuilder.java
@@ -18,7 +18,6 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.rest.RestBindingMode;
-import org.apache.unomi.api.services.ConfigSharingService;
import org.apache.unomi.router.api.ExportConfiguration;
import org.apache.unomi.router.api.ImportConfiguration;
import org.apache.unomi.router.core.context.RouterCamelContext;
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
index 5c3015e..b67859a 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportCollectRouteBuilder.java
@@ -16,21 +16,25 @@
*/
package org.apache.unomi.router.core.route;
-import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.ProcessorDefinition;
import org.apache.commons.lang3.StringUtils;
-import org.apache.unomi.api.Profile;
import org.apache.unomi.persistence.spi.PersistenceService;
import org.apache.unomi.router.api.ExportConfiguration;
+import org.apache.unomi.router.api.RouterConstants;
import org.apache.unomi.router.api.services.ImportExportConfigurationService;
+import org.apache.unomi.router.core.bean.CollectProfileBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
/**
* Created by amidani on 27/06/2017.
*/
-public class ProfileExportCollectRouteBuilder extends RouteBuilder {
+public class ProfileExportCollectRouteBuilder extends RouterAbstractRouteBuilder {
private static final Logger logger = LoggerFactory.getLogger(ProfileExportCollectRouteBuilder.class);
@@ -38,7 +42,9 @@
private ImportExportConfigurationService<ExportConfiguration> exportConfigurationService;
private PersistenceService persistenceService;
- private String allowedEndpoints;
+ public ProfileExportCollectRouteBuilder(Map<String, String> kafkaProps, String configType) {
+ super(kafkaProps, configType);
+ }
@Override
public void configure() throws Exception {
@@ -48,16 +54,37 @@
exportConfigurationList = exportConfigurationService.getAll();
}
+ CollectProfileBean collectProfileBean = new CollectProfileBean();
+ collectProfileBean.setPersistenceService(persistenceService);
+
+
//Loop on multiple export configuration
for (final ExportConfiguration exportConfiguration : exportConfigurationList) {
- String endpoint = (String) exportConfiguration.getProperties().get("destination");
-
- if (StringUtils.isNotBlank(endpoint) && allowedEndpoints.contains(endpoint.substring(0, endpoint.indexOf(':')))) {
- List<Profile> profilesCollected = persistenceService.query("segments", (String) exportConfiguration.getProperties().get("segments"),
- null, Profile.class);
- logger.info("Collected +++{}+++ profiles.", profilesCollected.size());
+ if (exportConfiguration.getProperties() != null && exportConfiguration.getProperties().size() > 0) {
+ if ((Map<String, String>) exportConfiguration.getProperties().get("mapping") != null) {
+ String destinationEndpoint = (String) exportConfiguration.getProperties().get("destination");
+ if (StringUtils.isNotBlank(destinationEndpoint) && allowedEndpoints.contains(destinationEndpoint.substring(0, destinationEndpoint.indexOf(':')))) {
+ ProcessorDefinition prDef = from("timer://collectProfile?fixedRate=true&period=" + (String) exportConfiguration.getProperties().get("period"))
+ .autoStartup(exportConfiguration.isActive())
+ .bean(collectProfileBean, "extractProfileBySegment(" + exportConfiguration.getProperties().get("segment") + ")")
+ .split(body())
+ .marshal(jacksonDataFormat)
+ .convertBodyTo(String.class)
+ .setHeader(RouterConstants.HEADER_EXPORT_CONFIG, constant(exportConfiguration))
+ .log(LoggingLevel.DEBUG, "BODY : ${body}");
+ if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+ prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+ } else {
+ prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+ }
+ } else {
+ logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", destinationEndpoint.substring(0, destinationEndpoint.indexOf(':')), exportConfiguration.getItemId());
+ }
+ } else {
+ logger.warn("Mapping is null in export configuration, route {} will be skipped!", exportConfiguration.getItemId());
+ }
} else {
- logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), exportConfiguration.getItemId());
+ logger.warn("Export configuration incomplete, route {} will be skipped!", exportConfiguration.getItemId());
}
}
}
@@ -74,8 +101,4 @@
this.persistenceService = persistenceService;
}
- public void setAllowedEndpoints(String allowedEndpoints) {
- this.allowedEndpoints = allowedEndpoints;
- }
-
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
new file mode 100644
index 0000000..0b0b60a
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileExportProducerRouteBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.route;
+
+import org.apache.camel.component.kafka.KafkaEndpoint;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.LineBuildProcessor;
+import org.apache.unomi.router.core.strategy.StringLinesAggregationStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Created by amidani on 28/06/2017.
+ */
+public class ProfileExportProducerRouteBuilder extends RouterAbstractRouteBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(ProfileExportProducerRouteBuilder.class);
+
+ private ExportRouteCompletionProcessor exportRouteCompletionProcessor;
+
+ public ProfileExportProducerRouteBuilder(Map<String, String> kafkaProps, String configType) {
+ super(kafkaProps, configType);
+ }
+
+ @Override
+ public void configure() throws Exception {
+
+ logger.info("Configure Recurrent Route 'Export :: Data Producer'");
+
+ RouteDefinition rtDef;
+ if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+ rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+ } else {
+ rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER));
+ }
+
+ rtDef.unmarshal(jacksonDataFormat)
+ .process(new LineBuildProcessor())
+ .aggregate(constant(true), new StringLinesAggregationStrategy())
+ .completionPredicate(exchangeProperty("CamelSplitSize").isEqualTo(exchangeProperty("CamelAggregatedSize")))
+ .eagerCheckCompletion()
+ .process(exportRouteCompletionProcessor)
+ .toD("${in.header.exportConfig.getProperty('destination')}");
+
+ }
+
+ public void setExportRouteCompletionProcessor(ExportRouteCompletionProcessor exportRouteCompletionProcessor) {
+ this.exportRouteCompletionProcessor = exportRouteCompletionProcessor;
+ }
+
+}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
index 7f54884..2dc87f3 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportFromSourceRouteBuilder.java
@@ -23,8 +23,8 @@
import org.apache.camel.model.ProcessorDefinition;
import org.apache.commons.lang3.StringUtils;
import org.apache.unomi.router.api.ImportConfiguration;
-import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.apache.unomi.router.api.RouterConstants;
+import org.apache.unomi.router.api.services.ImportExportConfigurationService;
import org.apache.unomi.router.core.exception.BadProfileDataFormatException;
import org.apache.unomi.router.core.processor.LineSplitFailureHandler;
import org.apache.unomi.router.core.processor.LineSplitProcessor;
@@ -39,15 +39,13 @@
* Created by amidani on 26/04/2017.
*/
-public class ProfileImportFromSourceRouteBuilder extends ProfileImportAbstractRouteBuilder {
+public class ProfileImportFromSourceRouteBuilder extends RouterAbstractRouteBuilder {
private static final Logger logger = LoggerFactory.getLogger(ProfileImportFromSourceRouteBuilder.class.getName());
private List<ImportConfiguration> importConfigurationList;
private ImportExportConfigurationService<ImportConfiguration> importConfigurationService;
- private String allowedEndpoints;
-
public ProfileImportFromSourceRouteBuilder(Map<String, String> kafkaProps, String configType) {
super(kafkaProps, configType);
}
@@ -67,9 +65,9 @@
.process(new LineSplitFailureHandler());
if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
- prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM));
+ prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
} else {
- prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+ prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
}
//Loop on multiple import configuration
@@ -113,9 +111,9 @@
.convertBodyTo(String.class);
if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
- prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM));
+ prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
} else {
- prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+ prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
}
} else {
logger.error("Endpoint scheme {} is not allowed, route {} will be skipped.", endpoint.substring(0, endpoint.indexOf(':')), importConfiguration.getItemId());
@@ -132,8 +130,4 @@
this.importConfigurationService = importConfigurationService;
}
- public void setAllowedEndpoints(String allowedEndpoints) {
- this.allowedEndpoints = allowedEndpoints;
- }
-
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
index a94b5ed..0913876 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportOneShotRouteBuilder.java
@@ -17,7 +17,6 @@
package org.apache.unomi.router.core.route;
import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.unomi.router.api.RouterConstants;
@@ -33,15 +32,12 @@
/**
* Created by amidani on 22/05/2017.
*/
-public class ProfileImportOneShotRouteBuilder extends ProfileImportAbstractRouteBuilder {
+public class ProfileImportOneShotRouteBuilder extends RouterAbstractRouteBuilder {
private Logger logger = LoggerFactory.getLogger(ProfileImportOneShotRouteBuilder.class.getName());
-
private ImportConfigByFileNameProcessor importConfigByFileNameProcessor;
private String uploadDir;
- private final String IMPORT_ONESHOT_ROUTE_ID = "ONE_SHOT_ROUTE";
-
public ProfileImportOneShotRouteBuilder(Map<String, String> kafkaProps, String configType) {
super(kafkaProps, configType);
}
@@ -57,27 +53,27 @@
.process(new LineSplitFailureHandler());
if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
- prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM));
+ prDefErr.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
} else {
- prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+ prDefErr.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
}
LineSplitProcessor lineSplitProcessor = new LineSplitProcessor();
- ProcessorDefinition prDef = from("file://"+uploadDir+"?include=.*.csv&consumer.delay=1m")
- .routeId(IMPORT_ONESHOT_ROUTE_ID)
+ ProcessorDefinition prDef = from("file://" + uploadDir + "?include=.*.csv&consumer.delay=1m")
+ .routeId(RouterConstants.IMPORT_ONESHOT_ROUTE_ID)
.autoStartup(true)
.process(importConfigByFileNameProcessor)
.split(bodyAs(String.class).tokenize("${in.header.importConfigOneShot.getLineSeparator}"))
- .setHeader("configType", constant(configType))
+ .setHeader(RouterConstants.HEADER_CONFIG_TYPE, constant(configType))
.process(lineSplitProcessor)
.to("log:org.apache.unomi.router?level=INFO")
.marshal(jacksonDataFormat)
.convertBodyTo(String.class);
- if(RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)){
- prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM));
+ if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+ prDef.to((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
} else {
- prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM));
+ prDef.to((String) getEndpointURI(RouterConstants.DIRECTION_FROM, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
}
}
@@ -88,8 +84,4 @@
public void setUploadDir(String uploadDir) {
this.uploadDir = uploadDir;
}
-
- public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
- this.jacksonDataFormat = jacksonDataFormat;
- }
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
index d75977b..759dde4 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportToUnomiRouteBuilder.java
@@ -17,11 +17,10 @@
package org.apache.unomi.router.core.route;
import org.apache.camel.LoggingLevel;
-import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.apache.camel.model.RouteDefinition;
import org.apache.unomi.router.api.RouterConstants;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
+import org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor;
import org.apache.unomi.router.core.processor.UnomiStorageProcessor;
import org.apache.unomi.router.core.strategy.ArrayListAggregationStrategy;
import org.slf4j.Logger;
@@ -32,12 +31,12 @@
/**
* Created by amidani on 26/04/2017.
*/
-public class ProfileImportToUnomiRouteBuilder extends ProfileImportAbstractRouteBuilder {
+public class ProfileImportToUnomiRouteBuilder extends RouterAbstractRouteBuilder {
private Logger logger = LoggerFactory.getLogger(ProfileImportToUnomiRouteBuilder.class.getName());
private UnomiStorageProcessor unomiStorageProcessor;
- private RouteCompletionProcessor routeCompletionProcessor;
+ private ImportRouteCompletionProcessor importRouteCompletionProcessor;
public ProfileImportToUnomiRouteBuilder(Map<String, String> kafkaProps, String configType) {
super(kafkaProps, configType);
@@ -50,9 +49,9 @@
RouteDefinition rtDef;
if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
- rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO));
+ rtDef = from((KafkaEndpoint) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
} else {
- rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO));
+ rtDef = from((String) getEndpointURI(RouterConstants.DIRECTION_TO, RouterConstants.DIRECT_IMPORT_DEPOSIT_BUFFER));
}
rtDef.choice()
.when(header(RouterConstants.HEADER_FAILED_MESSAGE).isNull())
@@ -64,7 +63,7 @@
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionPredicate(exchangeProperty("CamelSplitComplete").isEqualTo("true"))
.eagerCheckCompletion()
- .process(routeCompletionProcessor)
+ .process(importRouteCompletionProcessor)
.to("log:org.apache.unomi.router?level=INFO");
}
@@ -72,11 +71,7 @@
this.unomiStorageProcessor = unomiStorageProcessor;
}
- public void setRouteCompletionProcessor(RouteCompletionProcessor routeCompletionProcessor) {
- this.routeCompletionProcessor = routeCompletionProcessor;
- }
-
- public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
- this.jacksonDataFormat = jacksonDataFormat;
+ public void setImportRouteCompletionProcessor(ImportRouteCompletionProcessor importRouteCompletionProcessor) {
+ this.importRouteCompletionProcessor = importRouteCompletionProcessor;
}
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
similarity index 63%
rename from extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
rename to extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
index bacc38e..5db9917 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/ProfileImportAbstractRouteBuilder.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/route/RouterAbstractRouteBuilder.java
@@ -29,49 +29,60 @@
/**
* Created by amidani on 13/06/2017.
*/
-public abstract class ProfileImportAbstractRouteBuilder extends RouteBuilder {
+public abstract class RouterAbstractRouteBuilder extends RouteBuilder {
protected JacksonDataFormat jacksonDataFormat;
protected String kafkaHost;
protected String kafkaPort;
protected String kafkaImportTopic;
+ protected String kafkaExportTopic;
protected String kafkaImportGroupId;
- protected String kafkaImportConsumerCount;
- protected String kafkaImportAutoCommit;
+ protected String kafkaExportGroupId;
+ protected String kafkaConsumerCount;
+ protected String kafkaAutoCommit;
protected String configType;
+ protected String allowedEndpoints;
- public ProfileImportAbstractRouteBuilder(Map<String, String> kafkaProps, String configType) {
+ public RouterAbstractRouteBuilder(Map<String, String> kafkaProps, String configType) {
this.kafkaHost = kafkaProps.get("kafkaHost");
this.kafkaPort = kafkaProps.get("kafkaPort");
this.kafkaImportTopic = kafkaProps.get("kafkaImportTopic");
+ this.kafkaExportTopic = kafkaProps.get("kafkaExportTopic");
this.kafkaImportGroupId = kafkaProps.get("kafkaImportGroupId");
- this.kafkaImportConsumerCount = kafkaProps.get("kafkaImportConsumerCount");
- this.kafkaImportAutoCommit = kafkaProps.get("kafkaImportAutoCommit");
+ this.kafkaExportGroupId = kafkaProps.get("kafkaExportGroupId");
+ this.kafkaConsumerCount = kafkaProps.get("kafkaConsumerCount");
+ this.kafkaAutoCommit = kafkaProps.get("kafkaAutoCommit");
this.configType = configType;
}
- public Object getEndpointURI(String direction) {
+ public Object getEndpointURI(String direction, String operationDepositBuffer) {
Object endpoint;
if (RouterConstants.CONFIG_TYPE_KAFKA.equals(configType)) {
+ String kafkaTopic = kafkaImportTopic;
+ String kafkaGroupId = kafkaImportGroupId;
+ if (RouterConstants.DIRECT_EXPORT_DEPOSIT_BUFFER.equals(operationDepositBuffer)) {
+ kafkaTopic = kafkaExportTopic;
+ kafkaGroupId = kafkaExportGroupId;
+ }
//Prepare Kafka Deposit
StringBuilder kafkaUri = new StringBuilder("kafka:");
- kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaImportTopic);
- if (StringUtils.isNotBlank(kafkaImportGroupId)) {
- kafkaUri.append("&groupId=" + kafkaImportGroupId);
+ kafkaUri.append(kafkaHost).append(":").append(kafkaPort).append("?topic=").append(kafkaTopic);
+ if (StringUtils.isNotBlank(kafkaGroupId)) {
+ kafkaUri.append("&groupId=" + kafkaGroupId);
}
if (RouterConstants.DIRECTION_TO.equals(direction)) {
- kafkaUri.append("&autoCommitEnable=" + kafkaImportAutoCommit + "&consumersCount=" + kafkaImportConsumerCount);
+ kafkaUri.append("&autoCommitEnable=" + kafkaAutoCommit + "&consumersCount=" + kafkaConsumerCount);
}
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
kafkaConfiguration.setBrokers(kafkaHost + ":" + kafkaPort);
- kafkaConfiguration.setTopic(kafkaImportTopic);
- kafkaConfiguration.setGroupId(kafkaImportGroupId);
+ kafkaConfiguration.setTopic(kafkaTopic);
+ kafkaConfiguration.setGroupId(kafkaGroupId);
endpoint = new KafkaEndpoint(kafkaUri.toString(), new KafkaComponent(this.getContext()));
((KafkaEndpoint) endpoint).setConfiguration(kafkaConfiguration);
} else {
- endpoint = RouterConstants.DIRECT_DEPOSIT_BUFFER;
+ endpoint = operationDepositBuffer;
}
return endpoint;
@@ -80,4 +91,8 @@
public void setJacksonDataFormat(JacksonDataFormat jacksonDataFormat) {
this.jacksonDataFormat = jacksonDataFormat;
}
+
+ public void setAllowedEndpoints(String allowedEndpoints) {
+ this.allowedEndpoints = allowedEndpoints;
+ }
}
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
index a53e34b..ca87ad3 100644
--- a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/ArrayListAggregationStrategy.java
@@ -18,8 +18,6 @@
import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy;
-import org.apache.unomi.router.core.processor.RouteCompletionProcessor;
import java.util.ArrayList;
diff --git a/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
new file mode 100644
index 0000000..5a69001
--- /dev/null
+++ b/extensions/router/router-core/src/main/java/org/apache/unomi/router/core/strategy/StringLinesAggregationStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.unomi.router.core.strategy;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.unomi.router.api.ExportConfiguration;
+
+/**
+ * Created by amidani on 29/06/2017.
+ */
+public class StringLinesAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ Object newBody = newExchange.getIn().getBody(String.class);
+ String lineSeparator = newExchange.getIn().getHeader("exportConfig", ExportConfiguration.class).getLineSeparator();
+ if (oldExchange != null) {
+ String fileContent = oldExchange.getIn().getBody(String.class);
+
+ fileContent += lineSeparator + newBody;
+ oldExchange.getIn().setBody(fileContent);
+ return oldExchange;
+ } else {
+ return newExchange;
+ }
+ }
+}
diff --git a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
index 5ae1e9c..3b155b2 100644
--- a/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
+++ b/extensions/router/router-core/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -27,15 +27,16 @@
<cm:default-properties>
<cm:property name="config.type" value="nobroker"/>
<cm:property name="config.allowedEndpoints" value="file,ftp"/>
- <cm:property name="config.internalPort" value="8233"/>
<cm:property name="kafka.host" value="localhost"/>
<cm:property name="kafka.port" value="9092"/>
- <cm:property name="kafka.import.topic" value="camel-deposit"/>
+ <cm:property name="kafka.import.topic" value="import-deposit"/>
+ <cm:property name="kafka.export.topic" value="export-deposit"/>
<cm:property name="kafka.import.groupId" value="unomi-import-group"/>
- <cm:property name="kafka.import.consumerCount" value="10"/>
- <cm:property name="kafka.import.autoCommit" value="true"/>
+ <cm:property name="kafka.export.groupId" value="unomi-export-group"/>
+ <cm:property name="kafka.consumerCount" value="10"/>
+ <cm:property name="kafka.autoCommit" value="true"/>
<cm:property name="import.oneshot.uploadDir" value="/tmp/oneshot_import_configs/"/>
- <cm:property name="import.executionsHistory.size" value="5"/>
+ <cm:property name="executionsHistory.size" value="5"/>
</cm:default-properties>
</cm:property-placeholder>
@@ -43,9 +44,14 @@
<property name="profileImportService" ref="profileImportService"/>
</bean>
- <bean id="routeCompletionProcessor" class="org.apache.unomi.router.core.processor.RouteCompletionProcessor">
+ <bean id="importRouteCompletionProcessor" class="org.apache.unomi.router.core.processor.ImportRouteCompletionProcessor">
<property name="importConfigurationService" ref="importConfigurationService"/>
- <property name="executionsHistorySize" value="${import.executionsHistory.size}"/>
+ <property name="executionsHistorySize" value="${executionsHistory.size}"/>
+ </bean>
+
+ <bean id="exportRouteCompletionProcessor" class="org.apache.unomi.router.core.processor.ExportRouteCompletionProcessor">
+ <property name="exportConfigurationService" ref="exportConfigurationService"/>
+ <property name="executionsHistorySize" value="${executionsHistory.size}"/>
</bean>
<bean id="importConfigByFileNameProcessor" class="org.apache.unomi.router.core.processor.ImportConfigByFileNameProcessor">
@@ -86,26 +92,29 @@
init-method="initCamelContext" destroy-method="preDestroy">
<property name="configType" value="${config.type}"/>
<property name="allowedEndpoints" value="${config.allowedEndpoints}"/>
+ <property name="uploadDir" value="${import.oneshot.uploadDir}"/>
+ <property name="bundleContext" ref="blueprintBundleContext"/>
+ <property name="jacksonDataFormat" ref="jacksonDataFormat"/>
<property name="kafkaProps">
<map>
<entry key="kafkaHost" value="${kafka.host}"/>
<entry key="kafkaPort" value="${kafka.port}"/>
<entry key="kafkaImportTopic" value="${kafka.import.topic}"/>
+ <entry key="kafkaExportTopic" value="${kafka.export.topic}"/>
<entry key="kafkaImportGroupId" value="${kafka.import.groupId}"/>
- <entry key="kafkaImportConsumerCount" value="${kafka.import.consumerCount}"/>
- <entry key="kafkaImportAutoCommit" value="${kafka.import.autoCommit}"/>
+ <entry key="kafkaExportGroupId" value="${kafka.export.groupId}"/>
+ <entry key="kafkaConsumerCount" value="${kafka.consumerCount}"/>
+ <entry key="kafkaAutoCommit" value="${kafka.autoCommit}"/>
</map>
</property>
- <property name="uploadDir" value="${import.oneshot.uploadDir}"/>
<property name="unomiStorageProcessor" ref="unomiStorageProcessor"/>
- <property name="routeCompletionProcessor" ref="routeCompletionProcessor"/>
+ <property name="importRouteCompletionProcessor" ref="importRouteCompletionProcessor"/>
+ <property name="exportRouteCompletionProcessor" ref="exportRouteCompletionProcessor"/>
<property name="importConfigByFileNameProcessor" ref="importConfigByFileNameProcessor"/>
- <property name="importConfigurationService" ref="importConfigurationService"/>
- <property name="exportConfigurationService" ref="exportConfigurationService"/>
- <property name="persistenceService" ref="persistenceService"/>
- <property name="jacksonDataFormat" ref="jacksonDataFormat"/>
- <property name="bundleContext" ref="blueprintBundleContext"/>
<property name="configSharingService" ref="configSharingService" />
+ <property name="exportConfigurationService" ref="exportConfigurationService"/>
+ <property name="importConfigurationService" ref="importConfigurationService"/>
+ <property name="persistenceService" ref="persistenceService"/>
</bean>
<camel:camelContext id="httpEndpoint" xmlns="http://camel.apache.org/schema/blueprint">
@@ -116,6 +125,10 @@
<property name="routerCamelContext" ref="camelContext"/>
</bean>
+ <bean id="collectProfileBean" class="org.apache.unomi.router.core.bean.CollectProfileBean">
+ <property name="persistenceService" ref="persistenceService"/>
+ </bean>
+
<reference id="configSharingService" interface="org.apache.unomi.api.services.ConfigSharingService" />
<reference id="httpService" interface="org.osgi.service.http.HttpService"/>
<reference id="profileImportService" interface="org.apache.unomi.router.api.services.ProfileImportService"/>
diff --git a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
index 2aa385f..8f29d65 100644
--- a/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
+++ b/extensions/router/router-core/src/main/resources/org.apache.unomi.router.cfg
@@ -23,19 +23,18 @@
#Kafka
#kafka.host=localhost
#kafka.port=9092
-#kafka.import.topic=camel-deposit
+#kafka.import.topic=import-deposit
+#kafka.export.topic=export-deposit
#kafka.import.groupId=unomi-import-group
-#kafka.import.consumerCount=10
-#kafka.import.autoCommit=true
+#kafka.export.groupId=unomi-import-group
+#kafka.consumerCount=10
+#kafka.autoCommit=true
#Import One Shot upload directory
import.oneshot.uploadDir=${karaf.data}/tmp/unomi_oneshot_import_configs/
-#Import executions history size
-import.executionsHistory.size=5
+#Import/Export executions history size
+executionsHistory.size=5
#Allowed source endpoints
-config.allowedEndpoints=file,ftp
-
-#Internal Camel REST services port (Not public - DO NOT OPEN TO PUBLIC)
-config.internalPort=8233
\ No newline at end of file
+config.allowedEndpoints=file,ftp
\ No newline at end of file