| //// |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| https://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| //// |
| = Reactive client for Apache Pulsar |
| |
| :github: https://github.com/apache/pulsar-client-reactive |
| :latest_version: 0.1.0 |
| |
| Reactive client for Apache Pulsar which is compatible with the Reactive Streams specification. |
| This uses Project Reactor as the Reactive Streams implementation. |
| |
| == Getting it |
| |
| *This library requires Java 8 or + to run*. |
| |
| With Gradle: |
| |
| [source,groovy,subs="verbatim,attributes"] |
| ---- |
| dependencies { |
| implementation "org.apache.pulsar:pulsar-client-reactive-adapter:{latest_version}" |
| } |
| ---- |
| |
| With Maven: |
| |
| [source,xml,subs="verbatim,attributes"] |
| ---- |
| <dependencies> |
| <dependency> |
| <groupId>org.apache.pulsar</groupId> |
| <artifactId>pulsar-client-reactive-adapter</artifactId> |
| <version>{latest_version}</version> |
| </dependency> |
| </dependencies> |
| ---- |
| |
| == Usage |
| |
| === Initializing the library |
| |
| ==== In standalone application |
| |
| Using an existing PulsarClient instance: |
| |
| [source,java] |
| ---- |
| ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient); |
| ---- |
| |
| === Sending messages |
| |
| [source,java] |
| ---- |
| ReactiveMessageSender<String> messageSender = reactivePulsarClient |
| .messageSender(Schema.STRING) |
| .topic(topicName) |
| .maxInflight(100) |
| .build(); |
| Mono<MessageId> messageId = messageSender |
| .sendOne(MessageSpec.of("Hello world!")); |
| // for demonstration |
| messageId.subscribe(System.out::println); |
| ---- |
| |
| === Sending messages with cached producer |
| |
| By default a ConcurrentHashMap based cache is used. It's recommended to use a more advanced cache based on Caffeine. The cache will get used as the default implementation when it is on the classpath. |
| |
| Adding Caffeine based producer cache with Gradle: |
| |
| [source,groovy,subs="verbatim,attributes"] |
| ---- |
| dependencies { |
| implementation "org.apache.pulsar:pulsar-client-reactive-adapter:{latest_version}" |
| implementation "org.apache.pulsar:pulsar-client-reactive-producer-cache-caffeine:{latest_version}" |
| } |
| ---- |
| |
| Adding Caffeine based producer cache with Maven: |
| |
| [source,xml,subs="verbatim,attributes"] |
| ---- |
| <dependencies> |
| <dependency> |
| <groupId>org.apache.pulsar</groupId> |
| <artifactId>pulsar-client-reactive-adapter</artifactId> |
| <version>{latest_version}</version> |
| </dependency> |
| <dependency> |
| <groupId>org.apache.pulsar</groupId> |
| <artifactId>pulsar-client-reactive-producer-cache-caffeine</artifactId> |
| <version>{latest_version}</version> |
| </dependency> |
| </dependencies> |
| ---- |
| |
| Usage example of cache |
| |
| [source,java] |
| ---- |
| ReactiveMessageSender<String> messageSender = reactivePulsarClient |
| .messageSender(Schema.STRING) |
| .cache(AdaptedReactivePulsarClientFactory.createCache()) |
| .topic(topicName) |
| .maxInflight(100) |
| .build(); |
| Mono<MessageId> messageId = messageSender |
| .sendOne(MessageSpec.of("Hello world!")); |
| // for demonstration |
| messageId.subscribe(System.out::println); |
| ---- |
| |
| It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. |
| This improves performance since a producer won't have to be created and closed before and after sending a message. |
| |
| The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. The `maxInflight` setting will limit the number of messages that are pending from the client to the broker. The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. This limit is per-topic and impacts the local JVM only. |
| |
| === Reading messages |
| |
| Reading all messages for a topic: |
| |
| [source,java] |
| ---- |
| ReactiveMessageReader<String> messageReader = |
| reactivePulsarClient.messageReader(Schema.STRING) |
| .topic(topicName) |
| .build(); |
| messageReader.readMany() |
| .map(Message::getValue) |
| // for demonstration |
| .subscribe(System.out::println); |
| ---- |
| |
| By default, the stream will complete when the tail of the topic is reached. |
| |
| ==== Example: poll for up to 5 new messages and stop polling when a timeout occurs |
| |
| With `.endOfStreamAction(EndOfStreamAction.POLL)` the Reader will poll for new messages when the reader reaches the end of the topic. |
| |
| [source,java] |
| ---- |
| ReactiveMessageReader<String> messageReader = |
| reactivePulsarClient.messageReader(Schema.STRING) |
| .topic(topicName) |
| .startAtSpec(StartAtSpec.ofLatest()) |
| .endOfStreamAction(EndOfStreamAction.POLL) |
| .build(); |
| messageReader.readMany() |
| .take(Duration.ofSeconds(5)) |
| .take(5) |
| // for demonstration |
| .subscribe(System.out::println); |
| ---- |
| |
| === Consuming messages |
| |
| [source,java] |
| ---- |
| ReactiveMessageConsumer<String> messageConsumer= |
| reactivePulsarClient.messageConsumer(Schema.STRING) |
| .topic(topicName) |
| .subscriptionName("sub") |
| .build(); |
| messageConsumer.consumeMany(messageFlux -> |
| messageFlux.map(message -> |
| MessageResult.acknowledge(message.getMessageId(), message.getValue()))) |
| .take(Duration.ofSeconds(2)) |
| // for demonstration |
| .subscribe(System.out::println); |
| ---- |
| |
| === Consuming messages using a message handler component with auto-acknowledgements |
| |
| [source,java] |
| ---- |
| ReactiveMessagePipeline reactiveMessagePipeline = |
| reactivePulsarClient |
| .messageConsumer(Schema.STRING) |
| .subscriptionName("sub") |
| .topic(topicName) |
| .build() |
| .messagePipeline() |
| .messageHandler(message -> Mono.fromRunnable(()->{ |
| System.out.println(message.getValue()); |
| })) |
| .build() |
| .start(); |
| // for demonstration |
| // the reactive message handler is running in the background, delay for 10 seconds |
| Thread.sleep(10000L); |
| // now stop the message handler component |
| reactiveMessagePipeline.stop(); |
| ---- |
| |
| == License |
| |
| Reactive client for Apache Pulsar is Open Source Software released under the link:www.apache.org/licenses/LICENSE-2.0[Apache Software License 2.0]. |
| |
| == How to Contribute |
| |
| The library is Apache 2.0 licensed. |
| |
| Contributions are welcome. Please discuss larger changes on the link:mailto:dev@pulsar.apache.org[Apache Pulsar dev mailing list]. There's a link:CONTRIBUTING.adoc[contributing guide] with more details. |
| |
| == Bugs and Feature Requests |
| |
| If you detect a bug or have a feature request or a good idea for Reactive client for Apache Pulsar, please link:${github}/issues/new[open a GitHub issue]. |
| |
| == Questions |
| |
| Please use https://stackoverflow.com/tags/reactive-pulsar[[reactive-pulsar\]] tag on Stackoverflow. https://stackoverflow.com/questions/ask?tags=apache-pulsar,reactive-pulsar[Ask a question now]. |