Switched to use RuntimeCatalog.asEndpointUri method to construct the uri to better suport endpoints paths fix #143 fix #144
diff --git a/core/pom.xml b/core/pom.xml
index 125cdd6..548a04a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,6 +41,10 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-core-catalog</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-main</artifactId>
</dependency>
<dependency>
@@ -114,6 +118,11 @@
<artifactId>camel-debezium-common</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-cassandraql</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index baa6665..f9252d0 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -23,8 +23,11 @@
import java.util.List;
import java.util.Map;
+import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ProducerTemplate;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
import org.apache.camel.kafkaconnector.utils.TaskHelper;
import org.apache.camel.support.DefaultExchange;
@@ -67,11 +70,16 @@
String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
+ CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
- remoteUrl = TaskHelper.buildUrl(actualProps, config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF), CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SINK_PATH_PROPERTIES_PREFIX);
+ remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+ actualProps,
+ config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF),
+ CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
+ CAMEL_SINK_PATH_PROPERTIES_PREFIX);
}
- cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null);
+ cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null, camelContext);
producer = cms.createProducerTemplate();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 83ea1b1..ceecf94 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -27,9 +27,12 @@
import java.util.List;
import java.util.Map;
+import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.PollingConsumer;
+import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
import org.apache.camel.kafkaconnector.utils.SchemaHelper;
import org.apache.camel.kafkaconnector.utils.TaskHelper;
@@ -81,11 +84,16 @@
String localUrl = getLocalUrlWithPollingOptions(config);
+ CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
- remoteUrl = TaskHelper.buildUrl(actualProps, config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
+ remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+ actualProps,
+ config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
+ CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
+ CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
}
- cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller);
+ cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller, camelContext);
Endpoint endpoint = cms.getEndpoint(localUrl);
consumer = endpoint.createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index 91735f2..d84a41e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -28,8 +28,10 @@
import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.Main;
@@ -148,6 +150,10 @@
return camel.createConsumerTemplate();
}
+ public RuntimeCamelCatalog getRuntimeCamelCatalog() {
+ return camel.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+ }
+
private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
DataFormat df = camel.resolveDataFormat(dataformatName);
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index 6259985..c623662 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -16,16 +16,27 @@
*/
package org.apache.camel.kafkaconnector.utils;
+import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
+
public final class TaskHelper {
private TaskHelper() {
}
+ public static String buildUrl(RuntimeCamelCatalog rcc, Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) throws URISyntaxException {
+ Map<String, String> filteredProps = new HashMap<>();
+ props.keySet().stream()
+ .filter(k -> k.startsWith(endpointPropertiesPrefix) || k.startsWith(pathPropertiesPrefix))
+ .forEach(k -> filteredProps.put(k.replace(endpointPropertiesPrefix, "").replace(pathPropertiesPrefix, ""), props.get(k)));
+ return rcc.asEndpointUri(componentSchema, filteredProps, false);
+ }
+
public static String buildUrl(Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) {
final String urlPath = createUrlPathFromProperties(props, pathPropertiesPrefix);
final String endpointOptions = createEndpointOptionsFromProperties(props, endpointPropertiesPrefix);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3003afa..32e3bc6 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -489,7 +489,7 @@
props.put("topics", "mytopic");
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
- props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "test");
+ props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
@@ -518,7 +518,7 @@
props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
- props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "test");
+ props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
CamelSinkTask camelSinkTask = new CamelSinkTask();
camelSinkTask.start(props);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index a4ad86c..830a013 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -273,7 +273,7 @@
props.put("camel.source.kafka.topic", "mytopic");
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "1000");
- props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "pathChunk", "kafkaconnector");
+ props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "kafkaconnector");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
@@ -306,7 +306,7 @@
props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "1000");
props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "repeatCount", "0");
- props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "pathChunk", "kafkaconnector");
+ props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "kafkaconnector");
CamelSourceTask camelSourceTask = new CamelSourceTask();
camelSourceTask.start(props);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 2f32329..ea6938d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -16,10 +16,14 @@
*/
package org.apache.camel.kafkaconnector.utils;
+import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
+import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -175,4 +179,28 @@
assertEquals("test:value2?key1=value1", result);
}
+
+ @Test
+ public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
+ DefaultCamelContext dcc = new DefaultCamelContext();
+ RuntimeCamelCatalog rcc = dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+ Map<String, String> props = new HashMap<String, String>() {{
+ put("prefix.name", "test");
+ put("anotherPrefix.synchronous", "true");
+ }};
+
+ String result = TaskHelper.buildUrl(rcc, props, "direct", "prefix.", "anotherPrefix.");
+
+ assertEquals("direct:test?synchronous=true", result);
+
+ props = new HashMap<String, String>() {{
+ put("prefix.port", "8080");
+ put("anotherPrefix.keyspace", "test");
+ put("anotherPrefix.hosts", "localhost");
+ }};
+
+ result = TaskHelper.buildUrl(rcc, props, "cql", "prefix.", "anotherPrefix.");
+
+ assertEquals("cql:localhost:8080/test", result);
+ }
}
\ No newline at end of file