| //// |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| https://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| //// |
| = Reactive client for Apache Pulsar |
| |
| :github: https://github.com/apache/pulsar-client-reactive |
| :latest_version: 0.3.0 |
| |
| Reactive client for Apache Pulsar which is compatible with the Reactive Streams specification. |
| This uses Project Reactor as the Reactive Streams implementation. |
| |
| == Getting it |
| |
| *This library requires Java 8 or + to run*. |
| |
| With Gradle: |
| |
| [source,groovy,subs="verbatim,attributes"] |
| ---- |
| dependencies { |
| implementation "org.apache.pulsar:pulsar-client-reactive-adapter:{latest_version}" |
| } |
| ---- |
| |
| With Maven: |
| |
| [source,xml,subs="verbatim,attributes"] |
| ---- |
| <dependencies> |
| <dependency> |
| <groupId>org.apache.pulsar</groupId> |
| <artifactId>pulsar-client-reactive-adapter</artifactId> |
| <version>{latest_version}</version> |
| </dependency> |
| </dependencies> |
| ---- |
| |
| == Usage |
| |
| === Initializing the library |
| |
| ==== In standalone application |
| |
| Using an existing PulsarClient instance: |
| |
| [source,java] |
| ---- |
| ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); |
| ---- |
| |
| === Sending messages |
| |
| [source,java] |
| ---- |
| ReactiveMessageSender<String> messageSender = reactivePulsarClient |
| .messageSender(Schema.STRING) |
| .topic(topicName) |
| .maxInflight(100) |
| .build(); |
| Mono<MessageId> messageId = messageSender |
| .sendOne(MessageSpec.of("Hello world!")); |
| // for demonstration |
| messageId.subscribe(System.out::println); |
| ---- |
| |
| === Sending messages with cached producer |
| |
| By default, a ConcurrentHashMap based cache is used. |
| It's recommended to use a more advanced cache based on Caffeine. |
| The cache will get used as the default implementation when it is on the classpath. |
| |
| Adding Caffeine based producer cache with Gradle: |
| |
| [source,groovy,subs="verbatim,attributes"] |
| ---- |
| dependencies { |
| implementation "org.apache.pulsar:pulsar-client-reactive-adapter:{latest_version}" |
| implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine:{latest_version}" |
| } |
| ---- |
| |
| Adding Caffeine based producer cache with Maven: |
| |
| [source,xml,subs="verbatim,attributes"] |
| ---- |
| <dependencies> |
| <dependency> |
| <groupId>org.apache.pulsar</groupId> |
| <artifactId>pulsar-client-reactive-adapter</artifactId> |
| <version>{latest_version}</version> |
| </dependency> |
| <dependency> |
| <groupId>org.apache.pulsar</groupId> |
| <artifactId>pulsar-client-reactive-producer-cache-caffeine</artifactId> |
| <version>{latest_version}</version> |
| </dependency> |
| </dependencies> |
| ---- |
| |
| Usage example of cache |
| |
| [source,java] |
| ---- |
| ReactiveMessageSender<String> messageSender = reactivePulsarClient |
| .messageSender(Schema.STRING) |
| .cache(AdaptedReactivePulsarClientFactory.createCache()) |
| .topic(topicName) |
| .maxInflight(100) |
| .build(); |
| Mono<MessageId> messageId = messageSender |
| .sendOne(MessageSpec.of("Hello world!")); |
| // for demonstration |
| messageId.subscribe(System.out::println); |
| ---- |
| |
| It is recommended to use a cached producer in most cases. |
| The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. |
| This improves performance since a producer won't have to be created and closed before and after sending a message. |
| |
| The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. |
| The `maxInflight` setting will limit the number of messages that are pending from the client to the broker. |
| The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. |
| This limit is per-topic and impacts the local JVM only. |
| |
| === Shaded version of Caffeine |
| A version of the provider is available that shades it usage of Caffeine. |
| This is useful in scenarios where there is another version of Caffeine required in your application or if you do not want Caffeine on the classpath. |
| |
| Adding shaded Caffeine based producer cache with Gradle: |
| |
| [source,groovy,subs="verbatim,attributes"] |
| ---- |
| dependencies { |
| implementation "org.apache.pulsar:pulsar-client-reactive-adapter:{latest_version}" |
| implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine-shaded:{latest_version}" |
| } |
| ---- |
| |
| Adding shaded Caffeine based producer cache with Maven: |
| |
| [source,xml,subs="verbatim,attributes"] |
| ---- |
| <dependencies> |
| <dependency> |
| <groupId>org.apache.pulsar</groupId> |
| <artifactId>pulsar-client-reactive-adapter</artifactId> |
| <version>{latest_version}</version> |
| </dependency> |
| <dependency> |
| <groupId>org.apache.pulsar</groupId> |
| <artifactId>pulsar-client-reactive-producer-cache-caffeine-shaded</artifactId> |
| <version>{latest_version}</version> |
| </dependency> |
| </dependencies> |
| ---- |
| |
| |
| === Reading messages |
| |
| Reading all messages for a topic: |
| |
| [source,java] |
| ---- |
| ReactiveMessageReader<String> messageReader = |
| reactivePulsarClient.messageReader(Schema.STRING) |
| .topic(topicName) |
| .build(); |
| messageReader.readMany() |
| .map(Message::getValue) |
| // for demonstration |
| .subscribe(System.out::println); |
| ---- |
| |
| By default, the stream will complete when the tail of the topic is reached. |
| |
| ==== Example: poll for up to 5 new messages and stop polling when a timeout occurs |
| |
| With `.endOfStreamAction(EndOfStreamAction.POLL)` the Reader will poll for new messages when the reader reaches the end of the topic. |
| |
| [source,java] |
| ---- |
| ReactiveMessageReader<String> messageReader = |
| reactivePulsarClient.messageReader(Schema.STRING) |
| .topic(topicName) |
| .startAtSpec(StartAtSpec.ofLatest()) |
| .endOfStreamAction(EndOfStreamAction.POLL) |
| .build(); |
| messageReader.readMany() |
| .take(Duration.ofSeconds(5)) |
| .take(5) |
| // for demonstration |
| .subscribe(System.out::println); |
| ---- |
| |
| === Consuming messages |
| |
| [source,java] |
| ---- |
| ReactiveMessageConsumer<String> messageConsumer= |
| reactivePulsarClient.messageConsumer(Schema.STRING) |
| .topic(topicName) |
| .subscriptionName("sub") |
| .build(); |
| messageConsumer.consumeMany(messageFlux -> |
| messageFlux.map(message -> |
| MessageResult.acknowledge(message.getMessageId(), message.getValue()))) |
| .take(Duration.ofSeconds(2)) |
| // for demonstration |
| .subscribe(System.out::println); |
| ---- |
| |
| === Consuming messages using a message handler component with auto-acknowledgements |
| |
| [source,java] |
| ---- |
| ReactiveMessagePipeline reactiveMessagePipeline = |
| reactivePulsarClient |
| .messageConsumer(Schema.STRING) |
| .subscriptionName("sub") |
| .topic(topicName) |
| .build() |
| .messagePipeline() |
| .messageHandler(message -> Mono.fromRunnable(()->{ |
| System.out.println(message.getValue()); |
| })) |
| .build() |
| .start(); |
| // for demonstration |
| // the reactive message handler is running in the background, delay for 10 seconds |
| Thread.sleep(10000L); |
| // now stop the message handler component |
| reactiveMessagePipeline.stop(); |
| ---- |
| |
| == License |
| |
| Reactive client for Apache Pulsar is Open Source Software released under the link:www.apache.org/licenses/LICENSE-2.0[Apache Software License 2.0]. |
| |
| == How to Contribute |
| |
| The library is Apache 2.0 licensed. |
| |
| Contributions are welcome. Please discuss larger changes on the link:mailto:dev@pulsar.apache.org[Apache Pulsar dev mailing list]. There's a link:CONTRIBUTING.adoc[contributing guide] with more details. |
| |
| == Bugs and Feature Requests |
| |
| If you detect a bug or have a feature request or a good idea for Reactive client for Apache Pulsar, please link:${github}/issues/new[open a GitHub issue]. |
| |
| == Questions |
| |
| Please use https://stackoverflow.com/tags/reactive-pulsar[[reactive-pulsar\]] tag on Stackoverflow. https://stackoverflow.com/questions/ask?tags=apache-pulsar,reactive-pulsar[Ask a question now]. |