Switched to use RuntimeCatalog.asEndpointUri method to construct the uri to better suport endpoints paths fix #143 fix #144
diff --git a/core/pom.xml b/core/pom.xml
index 125cdd6..548a04a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -41,6 +41,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.camel</groupId>
+            <artifactId>camel-core-catalog</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
             <artifactId>camel-main</artifactId>
         </dependency>
         <dependency>
@@ -114,6 +118,11 @@
             <artifactId>camel-debezium-common</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-cassandraql</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index baa6665..f9252d0 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -23,8 +23,11 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
 import org.apache.camel.support.DefaultExchange;
@@ -67,11 +70,16 @@
             String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
             final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
 
+            CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(actualProps, config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF), CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SINK_PATH_PROPERTIES_PREFIX);
+                remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+                                                actualProps,
+                                                config.getString(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF),
+                                                CAMEL_SINK_ENDPOINT_PROPERTIES_PREFIX,
+                                                CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null);
+            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null, camelContext);
 
             producer = cms.createProducerTemplate();
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 83ea1b1..ceecf94 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -27,9 +27,12 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.PollingConsumer;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.kafkaconnector.utils.CamelMainSupport;
 import org.apache.camel.kafkaconnector.utils.SchemaHelper;
 import org.apache.camel.kafkaconnector.utils.TaskHelper;
@@ -81,11 +84,16 @@
 
             String localUrl = getLocalUrlWithPollingOptions(config);
 
+            CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
-                remoteUrl = TaskHelper.buildUrl(actualProps, config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
+                remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(),
+                                                actualProps,
+                                                config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF),
+                                                CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX,
+                                                CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller);
+            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller, camelContext);
 
             Endpoint endpoint = cms.getEndpoint(localUrl);
             consumer = endpoint.createPollingConsumer();
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index 91735f2..d84a41e 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -28,8 +28,10 @@
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
 import org.apache.camel.Endpoint;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.main.BaseMainSupport;
 import org.apache.camel.main.Main;
@@ -148,6 +150,10 @@
         return camel.createConsumerTemplate();
     }
 
+    public RuntimeCamelCatalog getRuntimeCamelCatalog() {
+        return camel.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+    }
+
     private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
         DataFormat df = camel.resolveDataFormat(dataformatName);
 
diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
index 6259985..c623662 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/TaskHelper.java
@@ -16,16 +16,27 @@
  */
 package org.apache.camel.kafkaconnector.utils;
 
+import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.camel.catalog.RuntimeCamelCatalog;
+
 public final class TaskHelper {
 
     private TaskHelper() {
     }
 
+    public static String buildUrl(RuntimeCamelCatalog rcc, Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) throws URISyntaxException {
+        Map<String, String> filteredProps = new HashMap<>();
+        props.keySet().stream()
+                .filter(k -> k.startsWith(endpointPropertiesPrefix) || k.startsWith(pathPropertiesPrefix))
+                .forEach(k -> filteredProps.put(k.replace(endpointPropertiesPrefix, "").replace(pathPropertiesPrefix, ""), props.get(k)));
+        return rcc.asEndpointUri(componentSchema, filteredProps, false);
+    }
+
     public static String buildUrl(Map<String, String> props, String componentSchema, String endpointPropertiesPrefix, String pathPropertiesPrefix) {
         final String urlPath = createUrlPathFromProperties(props, pathPropertiesPrefix);
         final String endpointOptions = createEndpointOptionsFromProperties(props, endpointPropertiesPrefix);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 3003afa..32e3bc6 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -489,7 +489,7 @@
         props.put("topics", "mytopic");
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
-        props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "test");
+        props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
         CamelSinkTask camelSinkTask = new CamelSinkTask();
         camelSinkTask.start(props);
@@ -518,7 +518,7 @@
         props.put(CamelSinkConnectorConfig.CAMEL_SINK_COMPONENT_CONF, "seda");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "bridgeErrorHandler", "true");
         props.put(CamelSinkTask.getCamelSinkEndpointConfigPrefix() + "size", "50");
-        props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "pathChunk", "test");
+        props.put(CamelSinkTask.getCamelSinkPathConfigPrefix() + "name", "test");
 
         CamelSinkTask camelSinkTask = new CamelSinkTask();
         camelSinkTask.start(props);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
index a4ad86c..830a013 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSourceTaskTest.java
@@ -273,7 +273,7 @@
         props.put("camel.source.kafka.topic", "mytopic");
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
         props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "1000");
-        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "pathChunk", "kafkaconnector");
+        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "kafkaconnector");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
@@ -306,7 +306,7 @@
         props.put(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF, "timer");
         props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "period", "1000");
         props.put(CamelSourceTask.getCamelSourceEndpointConfigPrefix() + "repeatCount", "0");
-        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "pathChunk", "kafkaconnector");
+        props.put(CamelSourceTask.getCamelSourcePathConfigPrefix() + "timerName", "kafkaconnector");
 
         CamelSourceTask camelSourceTask = new CamelSourceTask();
         camelSourceTask.start(props);
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
index 2f32329..ea6938d 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/TaskHelperTest.java
@@ -16,10 +16,14 @@
  */
 package org.apache.camel.kafkaconnector.utils;
 
+import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.catalog.RuntimeCamelCatalog;
+import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -175,4 +179,28 @@
 
         assertEquals("test:value2?key1=value1", result);
     }
+
+    @Test
+    public void testBuildUrlWithRuntimeCatalog() throws URISyntaxException {
+        DefaultCamelContext dcc = new DefaultCamelContext();
+        RuntimeCamelCatalog rcc = dcc.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog();
+        Map<String, String> props = new HashMap<String, String>() {{
+                put("prefix.name", "test");
+                put("anotherPrefix.synchronous", "true");
+            }};
+
+        String result = TaskHelper.buildUrl(rcc, props, "direct", "prefix.", "anotherPrefix.");
+
+        assertEquals("direct:test?synchronous=true", result);
+
+        props = new HashMap<String, String>() {{
+                put("prefix.port", "8080");
+                put("anotherPrefix.keyspace", "test");
+                put("anotherPrefix.hosts", "localhost");
+            }};
+
+        result = TaskHelper.buildUrl(rcc, props, "cql", "prefix.", "anotherPrefix.");
+
+        assertEquals("cql:localhost:8080/test", result);
+    }
 }
\ No newline at end of file