CAMEL-21807: camel-kamelet - Custom kamelet with dynamic query parameter with sql component
diff --git a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
index 8a70d4c..7f92e50 100644
--- a/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
+++ b/components/camel-sql/src/main/java/org/apache/camel/component/sql/SqlComponent.java
@@ -26,6 +26,7 @@
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.HealthCheckComponent;
import org.apache.camel.support.PropertyBindingSupport;
+import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.PropertiesHelper;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -65,13 +66,17 @@
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+ String query = getAndRemoveParameter(parameters, "query", String.class);
+ if (query == null) {
+ query = remaining;
+ }
+ if (ObjectHelper.isEmpty(query)) {
+ throw new IllegalArgumentException("Query parameter is required");
+ }
String parameterPlaceholderSubstitute = getAndRemoveParameter(parameters, "placeholder", String.class, "#");
-
- String query = remaining;
if (usePlaceholder) {
query = query.replaceAll(parameterPlaceholderSubstitute, "?");
}
-
String onConsume = getAndRemoveParameter(parameters, "consumer.onConsume", String.class);
if (onConsume == null) {
onConsume = getAndRemoveParameter(parameters, "onConsume", String.class);
diff --git a/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDynamicKameletTest.java b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDynamicKameletTest.java
new file mode 100644
index 0000000..20995c8
--- /dev/null
+++ b/components/camel-sql/src/test/java/org/apache/camel/component/sql/SqlDynamicKameletTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.sql;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
+import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class SqlDynamicKameletTest extends CamelTestSupport {
+
+ EmbeddedDatabase db;
+
+ @Override
+
+ public void doPreSetup() throws Exception {
+ db = new EmbeddedDatabaseBuilder()
+ .setName(getClass().getSimpleName())
+ .setType(EmbeddedDatabaseType.H2)
+ .addScript("sql/createAndPopulateDatabase.sql").build();
+ }
+
+ @Override
+ public void doPostTearDown() throws Exception {
+ if (db != null) {
+ db.shutdown();
+ }
+ }
+
+ @Test
+ public void testSimulateDynamicKamelet() throws InterruptedException {
+ MockEndpoint mock = getMockEndpoint("mock:query");
+ mock.expectedMessageCount(1);
+
+ template.requestBodyAndHeader("direct:query", "ASF", "names", "Camel,AMQ");
+
+ MockEndpoint.assertIsSatisfied(context);
+
+ List list = mock.getReceivedExchanges().get(0).getIn().getBody(List.class);
+ assertEquals(2, list.size());
+ Map row = (Map) list.get(0);
+ assertEquals("Camel", row.get("PROJECT"));
+ row = (Map) list.get(1);
+ assertEquals("AMQ", row.get("PROJECT"));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ // required for the sql component
+ getContext().getComponent("sql", SqlComponent.class).setDataSource(db);
+
+ context.getPropertiesComponent().addInitialProperty("localSql", "sql");
+ context.getPropertiesComponent().addInitialProperty("myQuery", "classpath:sql/selectProjectsAndIn.sql");
+
+ from("direct:query")
+ .to("{{localSql}}?query={{myQuery}}")
+ .to("log:query")
+ .to("mock:query");
+ }
+ };
+ }
+}
diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index 2412426..1c8bc5c 100644
--- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -785,16 +785,29 @@
}
if (answer == null) {
try {
- scheme = StringHelper.before(uri, ":");
+ // the uri may not contain a scheme such as a dynamic kamelet
+ // so we need to find the component name via the first text before : or ? mark
+ int pos1 = uri.indexOf(':');
+ int pos2 = uri.indexOf('?');
+ if (pos1 != -1 && pos2 != -1) {
+ scheme = uri.substring(0, Math.min(pos1, pos2));
+ } else if (pos1 != -1) {
+ scheme = uri.substring(0, pos1);
+ } else if (pos2 != -1) {
+ scheme = uri.substring(0, pos2);
+ } else {
+ scheme = null;
+ }
if (scheme == null) {
// it may refer to a logical endpoint
answer = camelContextExtension.getRegistry().lookupByNameAndType(uri, Endpoint.class);
if (answer != null) {
return answer;
- } else {
- throw new NoSuchEndpointException(uri);
}
}
+ if (scheme == null) {
+ scheme = uri;
+ }
LOG.trace("Endpoint uri: {} is from component with name: {}", uri, scheme);
Component component = getComponent(scheme);
ServiceHelper.initService(component);