| = Reactive Streams Component |
| :doctitle: Reactive Streams |
| :shortname: reactive-streams |
| :artifactid: camel-reactive-streams |
| :description: Exchange messages with reactive stream processing libraries compatible with the reactive streams standard. |
| :since: 2.19 |
| :supportlevel: Stable |
| :tabs-sync-option: |
| :component-header: Both producer and consumer are supported |
| //Manually maintained attributes |
| :camel-spring-boot-name: reactive-streams |
| |
| *Since Camel {since}* |
| |
| *{component-header}* |
| |
| The Reactive Streams component allows you to exchange messages with reactive |
| stream processing libraries compatible with the |
| http://www.reactive-streams.org/[reactive streams] standard. |
| |
| The component supports backpressure and has been tested using the https://github.com/reactive-streams/reactive-streams-jvm/tree/master/tck[reactive streams technology |
| compatibility kit (TCK)]. |
| |
| The Camel module provides a *reactive-streams* component that allows users to define incoming and |
| outgoing streams within Camel routes, and a direct client API that allows using Camel endpoints |
| directly into any external reactive framework. |
| |
| Camel uses an internal implementation of the reactive streams |
| _Publisher_ and _Subscriber_, so it's not tied to any specific framework. |
| The following reactive frameworks have been used in the integration tests: https://github.com/reactor/reactor-core[Reactor Core 3], https://github.com/ReactiveX/RxJava[RxJava 2]. |
| |
| Maven users will need to add the following dependency to their `pom.xml` |
| for this component: |
| |
| [source,xml] |
| ------------------------------------------------------------ |
| <dependency> |
| <groupId>org.apache.camel</groupId> |
| <artifactId>camel-reactive-streams</artifactId> |
| <version>x.x.x</version> |
| <!-- use the same version as your Camel core version --> |
| </dependency> |
| ------------------------------------------------------------ |
| |
| == URI format |
| |
| ------------------------------------------------- |
| reactive-streams://stream?[options] |
| ------------------------------------------------- |
| |
| Where *stream* is a logical stream name used to bind Camel routes to the |
| external stream processing systems. |
| |
| // component-configure options: START |
| |
| // component-configure options: END |
| |
| // component options: START |
| include::partial$component-configure-options.adoc[] |
| include::partial$component-endpoint-options.adoc[] |
| // component options: END |
| |
| // endpoint options: START |
| |
| // endpoint options: END |
| // component headers: START |
| include::partial$component-endpoint-headers.adoc[] |
| // component headers: END |
| |
| == Usage |
| |
| The library is aimed to support all the communication modes needed by an application to interact with Camel data: |
| |
| * *Get* data from Camel routes (In-Only from Camel) |
| * *Send* data to Camel routes (In-Only towards Camel) |
| * *Request* a transformation to a Camel route (In-Out towards Camel) |
| * *Process* data flowing from a Camel route using a reactive processing step (In-Out from Camel) |
| |
| == Getting data from Camel |
| In order to subscribe to data flowing from a Camel route, exchanges should be redirected to |
| a named stream, like in the following snippet: |
| |
| [source,java] |
| --------------------------------------------------------- |
| from("timer:clock") |
| .setBody().header(Exchange.TIMER_COUNTER) |
| .to("reactive-streams:numbers"); |
| --------------------------------------------------------- |
| |
| Routes can also be written using the XML DSL. |
| |
| In the example, an unbounded stream of numbers is associated to the name `numbers`. |
| The stream can be accessed using the `CamelReactiveStreams` utility class. |
| |
| [source,java] |
| --------------------------------------------------------- |
| CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); |
| |
| // Getting a stream of exchanges |
| Publisher<Exchange> exchanges = camel.fromStream("numbers"); |
| |
| // Getting a stream of Integers (using Camel standard conversion system) |
| Publisher<Integer> numbers = camel.fromStream("numbers", Integer.class); |
| --------------------------------------------------------- |
| |
| The stream can be used easily with any reactive streams compatible library. |
| Here is an example of how to use it with https://github.com/ReactiveX/RxJava[RxJava 2] |
| (although any reactive framework can be used to process events). |
| |
| [source,java] |
| --------------------------------------------------------- |
| Flowable.fromPublisher(numbers) |
| .doOnNext(System.out::println) |
| .subscribe(); |
| --------------------------------------------------------- |
| |
| The example prints all numbers generated by Camel into `System.out`. |
| |
| === Getting data from Camel using the direct API |
| |
| For short Camel routes and for users that prefer defining the whole processing flow |
| using functional constructs of the reactive framework (without using the Camel DSL at all), |
| streams can also be defined using Camel URIs. |
| |
| [source,java] |
| --------------------------------------------------------- |
| CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); |
| |
| // Get a stream from all the files in a directory |
| Publisher<String> files = camel.from("file:folder", String.class); |
| |
| // Use the stream in RxJava |
| Flowable.fromPublisher(files) |
| .doOnNext(System.out::println) |
| .subscribe(); |
| --------------------------------------------------------- |
| |
| == Sending data to Camel |
| When an external library needs to push events into a Camel route, the Reactive Streams |
| endpoint must be set as consumer. |
| |
| [source,java] |
| --------------------------------------------------------- |
| from("reactive-streams:elements") |
| .to("log:INFO"); |
| --------------------------------------------------------- |
| |
| A handle to the `elements` stream can be obtained from the `CamelReactiveStreams` utility class. |
| |
| [source,java] |
| --------------------------------------------------------- |
| CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); |
| |
| Subscriber<String> elements = camel.streamSubscriber("elements", String.class); |
| --------------------------------------------------------- |
| |
| The subscriber can be used to push events to the Camel route that consumes from the `elements` stream. |
| |
| Here is an example of how to use it with https://github.com/ReactiveX/RxJava[RxJava 2] |
| (although any reactive framework can be used to publish events). |
| |
| [source,java] |
| --------------------------------------------------------- |
| Flowable.interval(1, TimeUnit.SECONDS) |
| .map(i -> "Item " + i) |
| .subscribe(elements); |
| --------------------------------------------------------- |
| |
| String items are generated every second by RxJava in the example and they are pushed into the Camel route defined above. |
| |
| === Sending data to Camel using the direct API |
| |
| Also in this case, the direct API can be used to obtain a Camel subscriber from an endpoint URI. |
| |
| [source,java] |
| --------------------------------------------------------- |
| CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); |
| |
| // Send two strings to the "seda:queue" endpoint |
| Flowable.just("hello", "world") |
| .subscribe(camel.subscriber("seda:queue", String.class)); |
| --------------------------------------------------------- |
| |
| == Request a transformation to Camel |
| |
| Routes defined in some Camel DSL can be used within a reactive stream framework to perform a |
| specific transformation (the same mechanism can be also used to eg. just send data to a _http_ endpoint and continue). |
| |
| The following snippet shows how RxJava functional code can request the task of loading and marshalling files to Camel. |
| |
| [source,java] |
| --------------------------------------------------------- |
| CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); |
| |
| // Process files starting from their names |
| Flowable.just(new File("file1.txt"), new File("file2.txt")) |
| .flatMap(file -> camel.toStream("readAndMarshal", String.class)) |
| // Camel output will be converted to String |
| // other steps |
| .subscribe(); |
| --------------------------------------------------------- |
| |
| In order this to work, a route like the following should be defined in the Camel context: |
| |
| [source,java] |
| --------------------------------------------------------- |
| from("reactive-streams:readAndMarshal") |
| .marshal() // ... other details |
| --------------------------------------------------------- |
| |
| === Request a transformation to Camel using the direct API |
| |
| An alternative approach consists in using the URI endpoints directly in the reactive flow: |
| |
| [source,java] |
| --------------------------------------------------------- |
| CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); |
| |
| // Process files starting from their names |
| Flowable.just(new File("file1.txt"), new File("file2.txt")) |
| .flatMap(file -> camel.to("direct:process", String.class)) |
| // Camel output will be converted to String |
| // other steps |
| .subscribe(); |
| --------------------------------------------------------- |
| |
| When using the _to()_ method instead of the _toStream_, there is no need to define the |
| route using "reactive-streams:" endpoints (although they are used under the hood). |
| |
| In this case, the Camel transformation can be just: |
| |
| [source,java] |
| --------------------------------------------------------- |
| from("direct:process") |
| .marshal() // ... other details |
| --------------------------------------------------------- |
| |
| |
| == Process Camel data into the reactive framework |
| |
| While a reactive streams _Publisher_ allows exchanging data in a unidirectional way, |
| Camel routes often use a in-out exchange pattern (eg. to define REST endpoints and, in general, |
| where a reply is needed for each request). |
| |
| In these circumstances, users can add a reactive processing step to the flow, to enhance a Camel route or to |
| define the entire transformation using the reactive framework. |
| |
| For example, given the following route: |
| |
| [source,java] |
| --------------------------------------------------------- |
| from("timer:clock") |
| .setBody().header(Exchange.TIMER_COUNTER) |
| .to("direct:reactive") |
| .log("Continue with Camel route... n=${body}"); |
| --------------------------------------------------------- |
| |
| A reactive processing step can be associated to the "direct:reactive" endpoint: |
| |
| [source,java] |
| --------------------------------------------------------- |
| CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); |
| |
| camel.process("direct:reactive", Integer.class, items -> |
| Flowable.fromPublisher(items) // RxJava |
| .map(n -> -n)); // make every number negative |
| --------------------------------------------------------- |
| |
| Data flowing in the Camel route will be processed by the external reactive |
| framework then continue the processing flow inside Camel. |
| |
| This mechanism can also be used to define a In-Out exchange in a completely |
| reactive way. |
| |
| [source,java] |
| --------------------------------------------------------- |
| CamelReactiveStreamsService camel = CamelReactiveStreams.get(context); |
| |
| // requires a rest-capable Camel component |
| camel.process("rest:get:orders", exchange -> |
| Flowable.fromPublisher(exchange) |
| .flatMap(ex -> allOrders())); // retrieve orders asynchronously |
| --------------------------------------------------------- |
| |
| See Camel examples (*camel-example-reactive-streams*) for details. |
| |
| == Advanced Topics |
| === Controlling Backpressure (producer side) |
| |
| When routing Camel exchanges to an external subscriber, backpressure is handled by an internal buffer that caches exchanges |
| before delivering them. |
| If the subscriber is slower than the exchange rate, the buffer may become too big. In many circumstances this must be avoided. |
| |
| Considering the following route: |
| |
| [source,java] |
| --------------------------------------------------------- |
| from("jms:queue") |
| .to("reactive-streams:flow"); |
| --------------------------------------------------------- |
| |
| If the JMS queue contains a high number of messages and the Subscriber associated with the `flow` stream is too slow, |
| messages are dequeued from JMS and appended to the buffer, possibly causing a "out of memory" error. |
| To avoid such problems, a `ThrottlingInflightRoutePolicy` can be set in the route. |
| |
| [source,java] |
| --------------------------------------------------------- |
| ThrottlingInflightRoutePolicy policy = new ThrottlingInflightRoutePolicy(); |
| policy.setMaxInflightExchanges(10); |
| |
| from("jms:queue") |
| .routePolicy(policy) |
| .to("reactive-streams:flow"); |
| --------------------------------------------------------- |
| |
| The policy limits the maximum number of active exchanges (and so the maximum size of the buffer), |
| keeping it lower than the threshold (`10` in the example). |
| When more than `10` messages are in flight, the route is suspended, waiting for the subscriber to process them. |
| |
| With this mechanism, the subscriber controls the route suspension/resume automatically, through backpressure. |
| When multiple subscribers are consuming items from the same stream, the slowest one controls the route status automatically. |
| |
| In other circumstances, eg. when using a `http` consumer, the route suspension makes the http service unavailable, so |
| using the default configuration (no policy, unbounded buffer) should be preferable. Users should try to avoid memory issues |
| by limiting the number of requests to the http service (eg. scaling out). |
| |
| In contexts where a certain amount of data loss is acceptable, setting a backpressure strategy other than `BUFFER` can |
| be a solution for dealing with fast sources. |
| |
| [source,java] |
| --------------------------------------------------------- |
| from("direct:thermostat") |
| .to("reactive-streams:flow?backpressureStrategy=LATEST"); |
| --------------------------------------------------------- |
| |
| When the `LATEST` backpressure strategy is used, only the last exchange received from the route is kept by the publisher, while older data is discarded (other options are available). |
| |
| === Controlling Backpressure (consumer side) |
| |
| When Camel consumes items from a reactive-streams publisher, the maximum number of inflight exchanges can be set as endpoint option. |
| |
| The subscriber associated with the consumer interacts with the publisher to keep the number of messages in the route lower than the threshold. |
| |
| An example of backpressure-aware route: |
| |
| [source,java] |
| --------------------------------------------------------- |
| from("reactive-streams:numbers?maxInflightExchanges=10") |
| .to("direct:endpoint"); |
| --------------------------------------------------------- |
| |
| The number of items that Camel requests to the source publisher (through the reactive streams backpressure mechanism) |
| is always lower than `10`. Messages are processed by a single thread in the Camel side. |
| |
| The number of concurrent consumers (threads) can also be set as endpoint option (`concurrentConsumers`). |
| When using 1 consumer (the default), the order of items in the source stream is maintained. |
| When this value is increased, items will be processed concurrently by multiple threads (so not preserving the order). |
| |
| |
| |
| include::spring-boot:partial$starter.adoc[] |