Create a Camel ReactiveStreams extension #304
diff --git a/docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc b/docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc
index 7c22ea0..998cb7c 100644
--- a/docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc
+++ b/docs/modules/ROOT/pages/list-of-camel-quarkus-extensions.adoc
@@ -15,7 +15,7 @@
== Camel Components
// components: START
-Number of Camel components: 60 in 51 JAR artifacts (0 deprecated)
+Number of Camel components: 61 in 52 JAR artifacts (0 deprecated)
[width="100%",cols="4,1,5",options="header"]
|===
@@ -141,6 +141,9 @@
| xref:extensions/platform-http.adoc[Platform HTTP] (camel-quarkus-platform-http) +
`platform-http:path` | 0.3.0 | HTTP service leveraging existing runtime platform HTTP server
+| link:https://camel.apache.org/components/latest/reactive-streams-component.html[Reactive Streams] (camel-quarkus-reactive-streams) +
+`reactive-streams:stream` | 1.0.0 | Reactive Camel using reactive streams
+
| link:https://camel.apache.org/components/latest/rest-component.html[REST] (camel-quarkus-rest) +
`rest:method:path:uriTemplate` | 0.2.0 | The rest component is used for either hosting REST services (consumer) or calling external REST services (producer).
diff --git a/extensions/pom.xml b/extensions/pom.xml
index f17123b..181534c 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -90,6 +90,7 @@
<module>paho</module>
<module>pdf</module>
<module>platform-http</module>
+ <module>reactive-streams</module>
<module>rest</module>
<module>salesforce</module>
<module>scheduler</module>
diff --git a/extensions/reactive-streams/deployment/pom.xml b/extensions/reactive-streams/deployment/pom.xml
new file mode 100644
index 0000000..2516630
--- /dev/null
+++ b/extensions/reactive-streams/deployment/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-reactive-streams-parent</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>camel-quarkus-reactive-streams-deployment</artifactId>
+ <name>Camel Quarkus :: Reactive Streams :: Deployment</name>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-bom-deployment</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-core-deployment</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-reactive-streams</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-extension-processor</artifactId>
+ <version>${quarkus.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsProcessor.java b/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsProcessor.java
new file mode 100644
index 0000000..e5de2a4
--- /dev/null
+++ b/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsProcessor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.deployment;
+
+import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
+import io.quarkus.arc.deployment.BeanContainerBuildItem;
+import io.quarkus.deployment.annotations.BuildProducer;
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.annotations.ExecutionTime;
+import io.quarkus.deployment.annotations.Overridable;
+import io.quarkus.deployment.annotations.Record;
+import io.quarkus.deployment.builditem.FeatureBuildItem;
+import org.apache.camel.quarkus.component.reactive.streams.ReactiveStreamsProducers;
+import org.apache.camel.quarkus.component.reactive.streams.ReactiveStreamsRecorder;
+import org.apache.camel.quarkus.core.Flags;
+import org.apache.camel.quarkus.core.deployment.CamelBeanBuildItem;
+import org.apache.camel.quarkus.core.deployment.CamelContextBuildItem;
+import org.apache.camel.quarkus.core.deployment.CamelServiceFilter;
+import org.apache.camel.quarkus.core.deployment.CamelServiceFilterBuildItem;
+
+class ReactiveStreamsProcessor {
+ private static final String SCHEME = "reactive-streams";
+ private static final String FEATURE = "camel-reactive-streams";
+
+ @BuildStep
+ FeatureBuildItem feature() {
+ return new FeatureBuildItem(FEATURE);
+ }
+
+ @BuildStep
+ CamelServiceFilterBuildItem serviceFilter() {
+ return new CamelServiceFilterBuildItem(CamelServiceFilter.forComponent(SCHEME));
+ }
+
+ @BuildStep(onlyIf = Flags.MainEnabled.class)
+ void beans(BuildProducer<AdditionalBeanBuildItem> beanProducer) {
+ // thi extension will made some reactive camel reactive streams object availbale
+ // for injection in order to easy the use CamelReactiveStreams in CDI.
+ //
+ // For more info about what object are published, have a look at
+ // org.apache.camel.quarkus.component.reactive.streamsReactiveStreamsProducers
+ beanProducer.produce(AdditionalBeanBuildItem.unremovableOf(ReactiveStreamsProducers.class));
+ }
+
+ @Overridable
+ @BuildStep
+ @Record(value = ExecutionTime.STATIC_INIT, optional = true)
+ public ReactiveStreamsServiceFactoryBuildItem defaultReactiveStreamsServiceFactory(
+ ReactiveStreamsRecorder recorder) {
+ return new ReactiveStreamsServiceFactoryBuildItem(recorder.createDefaultReactiveStreamsServiceFactory());
+ }
+
+ @Record(ExecutionTime.STATIC_INIT)
+ @BuildStep
+ CamelBeanBuildItem reactiveStreamsComponent(
+ ReactiveStreamsRecorder recorder,
+ ReactiveStreamsServiceFactoryBuildItem reactiveStreamsServiceFactory) {
+
+ return new CamelBeanBuildItem(
+ SCHEME,
+ "org.apache.camel.component.reactive.streams.ReactiveStreamsComponent",
+ recorder.createReactiveStreamsComponent(reactiveStreamsServiceFactory.getValue()));
+ }
+
+ @Record(ExecutionTime.STATIC_INIT)
+ @BuildStep
+ void publishCamelReactiveStreamsService(
+ BeanContainerBuildItem beanContainer,
+ ReactiveStreamsRecorder recorder,
+ CamelContextBuildItem camelContext,
+ ReactiveStreamsServiceFactoryBuildItem reactiveStreamsServiceFactory) {
+
+ recorder.publishCamelReactiveStreamsService(
+ beanContainer.getValue(),
+ camelContext.getCamelContext(),
+ reactiveStreamsServiceFactory.getValue());
+ }
+}
diff --git a/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsServiceFactoryBuildItem.java b/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsServiceFactoryBuildItem.java
new file mode 100644
index 0000000..f625d77
--- /dev/null
+++ b/extensions/reactive-streams/deployment/src/main/java/org/apache/camel/quarkus/component/reactive/streams/deployment/ReactiveStreamsServiceFactoryBuildItem.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.deployment;
+
+import io.quarkus.builder.item.SimpleBuildItem;
+import io.quarkus.runtime.RuntimeValue;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServiceFactory;
+
+/**
+ * Holder for a {@link CamelReactiveStreamsServiceFactory} instance.
+ */
+final class ReactiveStreamsServiceFactoryBuildItem extends SimpleBuildItem {
+ private final RuntimeValue<CamelReactiveStreamsServiceFactory> value;
+
+ public ReactiveStreamsServiceFactoryBuildItem(RuntimeValue<CamelReactiveStreamsServiceFactory> value) {
+ this.value = value;
+ }
+
+ public RuntimeValue<CamelReactiveStreamsServiceFactory> getValue() {
+ return value;
+ }
+}
diff --git a/extensions/reactive-streams/pom.xml b/extensions/reactive-streams/pom.xml
new file mode 100644
index 0000000..fe62d62
--- /dev/null
+++ b/extensions/reactive-streams/pom.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-build-parent</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../../poms/build-parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>camel-quarkus-reactive-streams-parent</artifactId>
+ <name>Camel Quarkus :: Reactive Streams</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>deployment</module>
+ <module>runtime</module>
+ </modules>
+</project>
diff --git a/extensions/reactive-streams/runtime/pom.xml b/extensions/reactive-streams/runtime/pom.xml
new file mode 100644
index 0000000..6b0b151
--- /dev/null
+++ b/extensions/reactive-streams/runtime/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-reactive-streams-parent</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>camel-quarkus-reactive-streams</artifactId>
+ <name>Camel Quarkus :: Reactive Streams :: Runtime</name>
+
+ <properties>
+ <firstVersion>1.0.0</firstVersion>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-bom</artifactId>
+ <version>${project.version}</version>
+ <type>pom</type>
+ <scope>import</scope>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
+ <artifactId>camel-reactive-streams</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-bootstrap-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-extension-processor</artifactId>
+ <version>${quarkus.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsProducers.java b/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsProducers.java
new file mode 100644
index 0000000..28c217b
--- /dev/null
+++ b/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsProducers.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams;
+
+import javax.enterprise.inject.Produces;
+import javax.inject.Singleton;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServiceFactory;
+
+/**
+ * Producers of CamelReactiveStreams related beans that are injectable via CDI.
+ */
+@Singleton
+public class ReactiveStreamsProducers {
+ private volatile CamelContext camelContext;
+ private volatile CamelReactiveStreamsServiceFactory reactiveStreamsServiceFactory;
+
+ public void init(CamelContext camelContext, CamelReactiveStreamsServiceFactory reactiveStreamsServiceFactory) {
+ this.camelContext = camelContext;
+ this.reactiveStreamsServiceFactory = reactiveStreamsServiceFactory;
+ }
+
+ @Singleton
+ @Produces
+ CamelReactiveStreamsServiceFactory camelReactiveStreamsServiceFactory() {
+ return reactiveStreamsServiceFactory;
+ }
+
+ @Singleton
+ @Produces
+ CamelReactiveStreamsService camelReactiveStreamsService() {
+ return CamelReactiveStreams.get(camelContext);
+ }
+}
diff --git a/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsRecorder.java b/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsRecorder.java
new file mode 100644
index 0000000..2e731d3
--- /dev/null
+++ b/extensions/reactive-streams/runtime/src/main/java/org/apache/camel/quarkus/component/reactive/streams/ReactiveStreamsRecorder.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams;
+
+import io.quarkus.arc.runtime.BeanContainer;
+import io.quarkus.runtime.RuntimeValue;
+import io.quarkus.runtime.annotations.Recorder;
+import org.apache.camel.CamelContext;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServiceFactory;
+import org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsServiceFactory;
+import org.apache.camel.support.service.ServiceHelper;
+
+@Recorder
+public class ReactiveStreamsRecorder {
+ public RuntimeValue<CamelReactiveStreamsServiceFactory> createDefaultReactiveStreamsServiceFactory() {
+ return new RuntimeValue<>(new DefaultCamelReactiveStreamsServiceFactory());
+ }
+
+ public RuntimeValue<ReactiveStreamsComponent> createReactiveStreamsComponent(
+ RuntimeValue<CamelReactiveStreamsServiceFactory> serviceFactory) {
+ return new RuntimeValue<>(new QuarkusReactiveStreamsComponent(serviceFactory.getValue()));
+ }
+
+ @SuppressWarnings("unchecked")
+ public void publishCamelReactiveStreamsService(
+ BeanContainer beanContainer,
+ RuntimeValue<CamelContext> camelContext,
+ RuntimeValue<CamelReactiveStreamsServiceFactory> serviceFactory) {
+
+ // register to the container
+ beanContainer.instance(ReactiveStreamsProducers.class).init(
+ camelContext.getValue(),
+ serviceFactory.getValue());
+ }
+
+ private static class QuarkusReactiveStreamsComponent extends ReactiveStreamsComponent {
+ private final CamelReactiveStreamsServiceFactory reactiveStreamServiceFactory;
+ private final Object lock;
+ private CamelReactiveStreamsService reactiveStreamService;
+
+ public QuarkusReactiveStreamsComponent(CamelReactiveStreamsServiceFactory reactiveStreamServiceFactory) {
+ this.reactiveStreamServiceFactory = reactiveStreamServiceFactory;
+ this.lock = new Object();
+ }
+
+ @Override
+ public CamelReactiveStreamsService getReactiveStreamsService() {
+ synchronized (this.lock) {
+ if (reactiveStreamService == null) {
+ this.reactiveStreamService = reactiveStreamServiceFactory.newInstance(
+ getCamelContext(),
+ getInternalEngineConfiguration());
+
+ try {
+ // Start the service and add it to the Camel context to expose managed attributes
+ getCamelContext().addService(this.reactiveStreamService, true, true);
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ }
+ }
+
+ return this.reactiveStreamService;
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ ServiceHelper.stopService(this.reactiveStreamService);
+ this.reactiveStreamService = null;
+
+ super.doStop();
+ }
+ }
+}
diff --git a/extensions/reactive-streams/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/reactive-streams/runtime/src/main/resources/META-INF/quarkus-extension.yaml
new file mode 100644
index 0000000..c38c2ba
--- /dev/null
+++ b/extensions/reactive-streams/runtime/src/main/resources/META-INF/quarkus-extension.yaml
@@ -0,0 +1,27 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+---
+name: "Camel Quarkus Reactive Streams"
+description: "Camel Reactive Streams component"
+metadata:
+ keywords:
+ - "camel"
+ - "reactive-streams"
+ guide: "https://quarkus.io/guides/camel"
+ categories:
+ - "integration"
\ No newline at end of file
diff --git a/extensions/readme.adoc b/extensions/readme.adoc
index 9a2f8f3..0793702 100644
--- a/extensions/readme.adoc
+++ b/extensions/readme.adoc
@@ -5,7 +5,7 @@
== Camel Components
// components: START
-Number of Camel components: 60 in 51 JAR artifacts (0 deprecated)
+Number of Camel components: 61 in 52 JAR artifacts (0 deprecated)
[width="100%",cols="4,1,5",options="header"]
|===
@@ -131,6 +131,9 @@
| xref:extensions/platform-http.adoc[Platform HTTP] (camel-quarkus-platform-http) +
`platform-http:path` | 0.3.0 | HTTP service leveraging existing runtime platform HTTP server
+| link:https://camel.apache.org/components/latest/reactive-streams-component.html[Reactive Streams] (camel-quarkus-reactive-streams) +
+`reactive-streams:stream` | 1.0.0 | Reactive Camel using reactive streams
+
| link:https://camel.apache.org/components/latest/rest-component.html[REST] (camel-quarkus-rest) +
`rest:method:path:uriTemplate` | 0.2.0 | The rest component is used for either hosting REST services (consumer) or calling external REST services (producer).
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 217d8c6..6fe866b 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -186,6 +186,7 @@
<module>pdf</module>
<module>platform-http</module>
<module>platform-http-engine</module>
+ <module>reactive-streams</module>
<module>salesforce</module>
<module>scheduler</module>
<module>seda</module>
diff --git a/integration-tests/reactive-streams/pom.xml b/integration-tests/reactive-streams/pom.xml
new file mode 100644
index 0000000..af1ca05
--- /dev/null
+++ b/integration-tests/reactive-streams/pom.xml
@@ -0,0 +1,126 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-integration-tests</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>camel-quarkus-integration-test-reactive-streams</artifactId>
+ <name>Camel Quarkus :: Integration Tests :: Reactive Streams</name>
+ <description>Integration tests for Camel Quarkus Reactive Streams extension</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-reactive-streams</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-direct</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-resteasy-jsonb</artifactId>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-junit5</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.rest-assured</groupId>
+ <artifactId>rest-assured</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>native</id>
+ <activation>
+ <property>
+ <name>native</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <systemProperties>
+ <native.image.path>${project.build.directory}/${project.build.finalName}-runner</native.image.path>
+ </systemProperties>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>io.quarkus</groupId>
+ <artifactId>quarkus-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>native-image</id>
+ <goals>
+ <goal>native-image</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+</project>
diff --git a/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsResource.java b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsResource.java
new file mode 100644
index 0000000..b264dae
--- /dev/null
+++ b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsResource.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.FluentProducerTemplate;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
+import org.apache.camel.component.reactive.streams.ReactiveStreamsEndpoint;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsServiceFactory;
+import org.apache.camel.quarkus.component.reactive.streams.it.support.TestSubscriber;
+
+@Path("/reactive-streams")
+@ApplicationScoped
+public class ReactiveStreamsResource {
+ @Inject
+ CamelContext camelContext;
+ @Inject
+ FluentProducerTemplate producerTemplate;
+ @Inject
+ CamelReactiveStreamsService reactiveStreamsService;
+ @Inject
+ CamelReactiveStreamsServiceFactory reactiveStreamsServiceFactory;
+
+ @Path("/inspect")
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public JsonObject get() {
+ ReactiveStreamsComponent component = camelContext.getComponent("reactive-streams", ReactiveStreamsComponent.class);
+ ReactiveStreamsEndpoint endpoint = camelContext.getEndpointRegistry().values().stream()
+ .filter(ReactiveStreamsEndpoint.class::isInstance)
+ .map(ReactiveStreamsEndpoint.class::cast)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("Unable to find and endpoint of type ReactiveStreamsEndpoint"));
+
+ return Json.createObjectBuilder()
+ .add("reactive-streams-component-type", component.getClass().getName())
+ .add("reactive-streams-component-backpressure-strategy", component.getBackpressureStrategy().toString())
+ .add("reactive-streams-endpoint-backpressure-strategy", endpoint.getBackpressureStrategy().toString())
+ .add("reactive-streams-service-type", reactiveStreamsService.getClass().getName())
+ .add("reactive-streams-service-factory-type", reactiveStreamsServiceFactory.getClass().getName())
+ .build();
+ }
+
+ @Path("/to-upper")
+ @POST
+ @Produces(MediaType.TEXT_PLAIN)
+ public String toUpper(String payload) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<String> result = new AtomicReference<>();
+
+ TestSubscriber<String> subscriber = TestSubscriber.onNext(data -> {
+ result.set(data);
+ latch.countDown();
+ });
+
+ subscriber.setInitiallyRequested(1);
+ reactiveStreamsService.fromStream("toUpper", String.class).subscribe(subscriber);
+
+ producerTemplate.to("direct:toUpper").withBody(payload).send();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ return result.get();
+ }
+}
diff --git a/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsRoute.java b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsRoute.java
new file mode 100644
index 0000000..c085a93
--- /dev/null
+++ b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsRoute.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it;
+
+import org.apache.camel.builder.RouteBuilder;
+
+public class ReactiveStreamsRoute extends RouteBuilder {
+ @Override
+ public void configure() throws Exception {
+ from("direct:toUpper")
+ .routeId("toUpper")
+ .setBody().body(String.class, s -> s.toUpperCase())
+ .to("reactive-streams:toUpper?backpressureStrategy=BUFFER");
+ }
+}
diff --git a/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/support/TestSubscriber.java b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/support/TestSubscriber.java
new file mode 100644
index 0000000..53cabe4
--- /dev/null
+++ b/integration-tests/reactive-streams/src/main/java/org/apache/camel/quarkus/component/reactive/streams/it/support/TestSubscriber.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it.support;
+
+import java.util.function.Consumer;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+public class TestSubscriber<T> implements Subscriber<T> {
+ protected Subscription subscription;
+
+ private long initiallyRequested;
+
+ public TestSubscriber() {
+ }
+
+ public static <V> TestSubscriber<V> onNext(Consumer<V> consumer) {
+ return new TestSubscriber<V>() {
+ @Override
+ public void onNext(V data) {
+ consumer.accept(data);
+ }
+ };
+ }
+
+ public long getInitiallyRequested() {
+ return initiallyRequested;
+ }
+
+ public void setInitiallyRequested(long initiallyRequested) {
+ this.initiallyRequested = initiallyRequested;
+ }
+
+ public void request(long exchanges) {
+ this.subscription.request(exchanges);
+ }
+
+ public void cancel() {
+ this.subscription.cancel();
+ }
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+
+ if (initiallyRequested > 0) {
+ subscription.request(initiallyRequested);
+ }
+ }
+
+ @Override
+ public void onNext(T t) {
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+}
diff --git a/integration-tests/reactive-streams/src/main/resources/application.properties b/integration-tests/reactive-streams/src/main/resources/application.properties
new file mode 100644
index 0000000..0943243
--- /dev/null
+++ b/integration-tests/reactive-streams/src/main/resources/application.properties
@@ -0,0 +1,32 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+#
+# Quarkus
+#
+quarkus.log.file.enable = false
+
+#
+# Camel
+#
+camel.context.name = quarkus-camel-example
+
+#
+# Camel :: Reactive Streams
+#
+
+camel.component.reactive-streams.backpressure-strategy = LATEST
\ No newline at end of file
diff --git a/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsIT.java b/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsIT.java
new file mode 100644
index 0000000..2f25623
--- /dev/null
+++ b/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsIT.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it;
+
+import io.quarkus.test.junit.NativeImageTest;
+
+@NativeImageTest
+class ReactiveStreamsIT extends ReactiveStreamsTest {
+
+}
diff --git a/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsTest.java b/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsTest.java
new file mode 100644
index 0000000..702be1d
--- /dev/null
+++ b/integration-tests/reactive-streams/src/test/java/org/apache/camel/quarkus/component/reactive/streams/it/ReactiveStreamsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.reactive.streams.it;
+
+import io.quarkus.test.junit.QuarkusTest;
+import io.restassured.RestAssured;
+import io.restassured.path.json.JsonPath;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@QuarkusTest
+class ReactiveStreamsTest {
+ @Test
+ public void reactiveStreamsService() {
+ JsonPath result = RestAssured.get("/reactive-streams/inspect")
+ .then()
+ .statusCode(200)
+ .extract()
+ .body()
+ .jsonPath();
+
+ assertThat(result.getString("reactive-streams-component-type")).isEqualTo(
+ "org.apache.camel.quarkus.component.reactive.streams.ReactiveStreamsRecorder$QuarkusReactiveStreamsComponent");
+ assertThat(result.getString("reactive-streams-component-backpressure-strategy")).isEqualTo(
+ "LATEST");
+ assertThat(result.getString("reactive-streams-endpoint-backpressure-strategy")).isEqualTo(
+ "BUFFER");
+ assertThat(result.getString("reactive-streams-service-type")).isEqualTo(
+ "org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService");
+ assertThat(result.getString("reactive-streams-service-factory-type")).isEqualTo(
+ "org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsServiceFactory");
+ }
+
+ @Test
+ public void subscriber() {
+ final String payload = "test";
+
+ RestAssured.given()
+ .body(payload)
+ .post("/reactive-streams/to-upper")
+ .then()
+ .statusCode(200)
+ .body(is(payload.toUpperCase()));
+ }
+
+}
diff --git a/poms/bom-deployment/pom.xml b/poms/bom-deployment/pom.xml
index 1f53a45..e258e26 100644
--- a/poms/bom-deployment/pom.xml
+++ b/poms/bom-deployment/pom.xml
@@ -321,6 +321,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-reactive-streams-deployment</artifactId>
+ <version>${camel-quarkus.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-rest-deployment</artifactId>
<version>${camel-quarkus.version}</version>
</dependency>
diff --git a/poms/bom/pom.xml b/poms/bom/pom.xml
index a0aa572..ddcef6b 100644
--- a/poms/bom/pom.xml
+++ b/poms/bom/pom.xml
@@ -393,6 +393,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
+ <artifactId>camel-reactive-streams</artifactId>
+ <version>${camel.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel</groupId>
<artifactId>camel-rest</artifactId>
<version>${camel.version}</version>
</dependency>
@@ -766,6 +771,11 @@
</dependency>
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
+ <artifactId>camel-quarkus-reactive-streams</artifactId>
+ <version>${camel-quarkus.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-rest</artifactId>
<version>${camel-quarkus.version}</version>
</dependency>