Add untilStarted & untilStopped to ReactiveMessagePipeline (#214)
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 05282e7..2fd4c57 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -52,6 +52,7 @@
mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
pulsar-client-api = { module = "org.apache.pulsar:pulsar-client-api", version.ref = "pulsar" }
pulsar-client-shaded = { module = "org.apache.pulsar:pulsar-client", version.ref = "pulsar" }
+pulsar-client-all = { module = "org.apache.pulsar:pulsar-client-all", version.ref = "pulsar" }
rat-gradle = { module = "org.nosphere.apache:creadur-rat-gradle", version.ref = "rat-gradle" }
reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor" }
reactor-test = { module = "io.projectreactor:reactor-test", version.ref = "reactor" }
diff --git a/pulsar-client-reactive-adapter/build.gradle b/pulsar-client-reactive-adapter/build.gradle
index 984775a..37e6774 100644
--- a/pulsar-client-reactive-adapter/build.gradle
+++ b/pulsar-client-reactive-adapter/build.gradle
@@ -35,6 +35,7 @@
testImplementation libs.bundles.log4j
testImplementation libs.mockito.core
+ intTestImplementation libs.pulsar.client.all
intTestImplementation project(':pulsar-client-reactive-producer-cache-caffeine')
intTestImplementation project(path: ':pulsar-client-reactive-producer-cache-caffeine-shaded', configuration: 'shadow')
intTestImplementation libs.junit.jupiter
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
index 6c8dada..a93988d 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
@@ -36,8 +36,11 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.MessageSpecBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
@@ -94,6 +97,38 @@
}
}
+ @Test
+ void shouldSupportWaitingForConsumingToStartAndStop() throws Exception {
+ try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient();
+ PulsarAdmin pulsarAdmin = SingletonPulsarContainer.createPulsarAdmin()) {
+ String topicName = "test" + UUID.randomUUID();
+ ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
+ ReactiveMessagePipeline pipeline = reactivePulsarClient.messageConsumer(Schema.STRING)
+ .subscriptionName("sub")
+ .topic(topicName)
+ .build()
+ .messagePipeline()
+ .messageHandler((message) -> Mono.empty())
+ .build()
+ .start();
+
+ // wait for consuming to start
+ pipeline.untilStarted().block(Duration.ofSeconds(5));
+ // there should be an existing subscription
+ List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
+ assertThat(subscriptions).as("subscription should be created").contains("sub");
+
+ // stop the pipeline
+ pipeline.stop();
+ // and wait for it to stop
+ pipeline.untilStopped().block(Duration.ofSeconds(5));
+ // there should be no consumers
+ TopicStats topicStats = pulsarAdmin.topics().getStats(topicName);
+ SubscriptionStats subStats = topicStats.getSubscriptions().get("sub");
+ assertThat(subStats.getConsumers()).isEmpty();
+ }
+ }
+
@ParameterizedTest
@EnumSource(MessageOrderScenario.class)
void shouldRetainMessageOrder(MessageOrderScenario messageOrderScenario) throws Exception {
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
index 431e448..d53e185 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.reactive.client.adapter;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testcontainers.containers.PulsarContainer;
@@ -44,6 +45,12 @@
.build();
}
+ static PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+ return PulsarAdmin.builder()
+ .serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl())
+ .build();
+ }
+
static DockerImageName getPulsarImage() {
return DockerImageName.parse("apachepulsar/pulsar:4.0.4");
}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
index 8e89a09..e66ba28 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
@@ -25,6 +25,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
@@ -45,12 +46,20 @@
}
private Mono<Consumer<T>> createConsumerMono() {
- return AdapterImplementationFactory.adaptPulsarFuture(
- () -> this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync());
+ return Mono.deferContextual((contextView) -> AdapterImplementationFactory
+ .adaptPulsarFuture(
+ () -> this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync())
+ .doOnSuccess((consumer) -> contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
+ .ifPresent((listener) -> listener.onConsumerCreated(consumer))));
}
private Mono<Void> closeConsumer(Consumer<?> consumer) {
- return Mono.fromFuture(consumer::closeAsync).doOnSuccess((__) -> this.LOG.info("Consumer closed {}", consumer));
+ return Mono.deferContextual((contextView) -> Mono.fromFuture(consumer::closeAsync).doFinally((signalType) -> {
+ this.LOG.info("Consumer closed {}", consumer);
+ contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
+ .ifPresent((listener) -> listener.onConsumerClosed(consumer));
+ }));
+
}
<R> Mono<R> usingConsumer(Function<Consumer<T>, Mono<R>> usingConsumerAction) {
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
index 47804a5..9613293 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
@@ -19,20 +19,26 @@
package org.apache.pulsar.reactive.client.api;
+import reactor.core.publisher.Mono;
+
/**
* Reactive message pipeline interface.
*/
public interface ReactiveMessagePipeline extends AutoCloseable {
/**
- * Starts the reactive pipeline.
- * @return the pipeline
+ * Starts the reactive pipeline asynchronously.
+ * @return the pipeline instance
+ * @see #untilStarted() For returning a reactive publisher (Mono) that completes after
+ * the pipeline has actually started.
*/
ReactiveMessagePipeline start();
/**
- * Stops the reactive pipeline.
- * @return the reactive pipeline
+ * Stops the reactive pipeline asynchronously.
+ * @return the pipeline instance
+ * @see #untilStopped() For returning a reactive publisher (Mono) that completes after
+ * the pipeline has actually stopped.
*/
ReactiveMessagePipeline stop();
@@ -43,11 +49,54 @@
boolean isRunning();
/**
- * Closes the reactive pipeline.
+ * Closes the reactive pipeline asynchronously without waiting for shutdown
+ * completion.
* @throws Exception if an error occurs
*/
default void close() throws Exception {
stop();
}
+ /**
+ * <p>
+ * Returns a reactive publisher (Mono) that completes after the pipeline has
+ * successfully subscribed to the input topic(s) and started consuming messages for
+ * the first time after pipeline creation. This method is not intended to be used
+ * after a pipeline restarts following failure. Use this method to wait for consumer
+ * and Pulsar subscription creation. This helps avoid race conditions when sending
+ * messages immediately after the pipeline starts.
+ * </p>
+ * <p>
+ * The {@link #start()} method must be called before invoking this method.
+ * </p>
+ * <p>
+ * To wait for the operation to complete synchronously, it is necessary to call
+ * {@link Mono#block()} on the returned Mono.
+ * </p>
+ * @return a Mono that completes after the pipeline has created its underlying Pulsar
+ * consumer
+ */
+ default Mono<Void> untilStarted() {
+ return Mono.empty();
+ }
+
+ /**
+ * <p>
+ * Returns a reactive publisher (Mono) that completes after the pipeline has closed
+ * the underlying Pulsar consumer and stopped consuming new messages.
+ * </p>
+ * <p>
+ * The {@link #stop()} method must be called before invoking this method.
+ * </p>
+ * <p>
+ * To wait for the operation to complete synchronously, it is necessary to call
+ * {@link Mono#block()} on the returned Mono.
+ * </p>
+ * @return a Mono that completes when the pipeline has closed the underlying Pulsar
+ * consumer
+ */
+ default Mono<Void> untilStopped() {
+ return Mono.empty();
+ }
+
}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index cd94f19..eefdcf2 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -21,6 +21,7 @@
import java.time.Duration;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
@@ -67,6 +68,10 @@
private final MessageGroupingFunction groupingFunction;
+ private final AtomicReference<InternalConsumerListenerImpl> consumerListener = new AtomicReference<>();
+
+ private final AtomicReference<CompletableFuture<Void>> pipelineStoppedFuture = new AtomicReference<>();
+
DefaultReactiveMessagePipeline(ReactiveMessageConsumer<T> messageConsumer,
Function<Message<T>, Publisher<Void>> messageHandler, BiConsumer<Message<T>, Throwable> errorLogger,
Retry pipelineRetrySpec, Duration handlingTimeout, Function<Mono<Void>, Publisher<Void>> transformer,
@@ -83,7 +88,14 @@
this.pipeline = messageConsumer.consumeMany(this::createMessageConsumer)
.then()
.transform(transformer)
- .transform(this::decoratePipeline);
+ .transform(this::decoratePipeline)
+ .doFinally((signalType) -> {
+ CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
+ if (f != null) {
+ f.complete(null);
+ }
+ })
+ .doFirst(() -> this.pipelineStoppedFuture.set(new CompletableFuture<>()));
}
private Mono<Void> decorateMessageHandler(Mono<Void> messageHandler) {
@@ -168,14 +180,26 @@
if (this.killSwitch.get() != null) {
throw new IllegalStateException("Message handler is already running.");
}
- Disposable disposable = this.pipeline.subscribe(null, this::logError, this::logUnexpectedCompletion);
+ InternalConsumerListenerImpl consumerListener = new InternalConsumerListenerImpl();
+ Disposable disposable = this.pipeline.contextWrite(Context.of(InternalConsumerListener.class, consumerListener))
+ .subscribe(null, this::logError, this::logUnexpectedCompletion);
if (!this.killSwitch.compareAndSet(null, disposable)) {
disposable.dispose();
throw new IllegalStateException("Message handler was already running.");
}
+ this.consumerListener.set(consumerListener);
return this;
}
+ @Override
+ public Mono<Void> untilStarted() {
+ if (!isRunning()) {
+ throw new IllegalStateException("Pipeline isn't running. Call start first.");
+ }
+ InternalConsumerListenerImpl internalConsumerListener = this.consumerListener.get();
+ return internalConsumerListener.waitForConsumerCreated();
+ }
+
private void logError(Throwable throwable) {
LOG.error("ReactiveMessageHandler was unexpectedly terminated.", throwable);
}
@@ -196,8 +220,43 @@
}
@Override
+ public Mono<Void> untilStopped() {
+ if (isRunning()) {
+ throw new IllegalStateException("Pipeline is running. Call stop first.");
+ }
+ CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
+ if (f != null) {
+ return Mono.fromFuture(f, true);
+ }
+ else {
+ return Mono.empty();
+ }
+ }
+
+ @Override
public boolean isRunning() {
return this.killSwitch.get() != null;
}
+ private static final class InternalConsumerListenerImpl implements InternalConsumerListener {
+
+ private final CompletableFuture<Void> createdFuture;
+
+ private InternalConsumerListenerImpl() {
+ this.createdFuture = new CompletableFuture<>();
+ }
+
+ @Override
+ public void onConsumerCreated(Object nativeConsumer) {
+ if (!this.createdFuture.isDone()) {
+ this.createdFuture.complete(null);
+ }
+ }
+
+ Mono<Void> waitForConsumerCreated() {
+ return Mono.fromFuture(this.createdFuture, true);
+ }
+
+ }
+
}
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
new file mode 100644
index 0000000..8ee8f4b
--- /dev/null
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.internal.api;
+
+/**
+ * Internal interface to signal the creation and closing of a native consumer. This is not
+ * to be intended to be used by applications.
+ */
+public interface InternalConsumerListener {
+
+ /**
+ * Called when a new native consumer is created. This is called each time a new
+ * consumer is created initially or as a result of a reactive pipeline retry.
+ * @param nativeConsumer the native consumer instance
+ */
+ default void onConsumerCreated(Object nativeConsumer) {
+ // no-op
+ }
+
+ /**
+ * Called when a native consumer is closed. This is called each time a consumer is
+ * closed as a result of a reactive pipeline retry or when the pipeline is closed.
+ * @param nativeConsumer the native consumer instance
+ */
+ default void onConsumerClosed(Object nativeConsumer) {
+ // no-op
+ }
+
+}
diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
index 3415481..f330a54 100644
--- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
@@ -28,6 +28,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -38,6 +39,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -49,6 +51,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
class ReactiveMessagePipelineTests {
@@ -165,6 +168,27 @@
}
@Test
+ void pipelineUntilStartedAndStopped() throws Exception {
+ int numMessages = 10;
+ Duration subscriptionDelay = Duration.ofSeconds(1);
+ TestConsumer testConsumer = new TestConsumer(numMessages, subscriptionDelay);
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ Function<Message<String>, Publisher<Void>> messageHandler = (
+ message) -> Mono.empty().then().doFinally((__) -> latch.countDown());
+ ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler).build();
+ pipeline.start();
+ // timeout should occur since subscription delay is 1 second in TestConsumer
+ assertThatThrownBy(() -> pipeline.untilStarted().block(Duration.ofMillis(100)))
+ .isInstanceOf(IllegalStateException.class)
+ .hasCauseInstanceOf(TimeoutException.class);
+ // now wait for consuming to start
+ pipeline.untilStarted().block(Duration.ofSeconds(2));
+ assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+ // now wait for consuming to stop
+ pipeline.stop().untilStopped().block(Duration.ofSeconds(1));
+ }
+
+ @Test
void streamingHandler() throws Exception {
int numMessages = 10;
TestConsumer testConsumer = new TestConsumer(numMessages);
@@ -480,10 +504,17 @@
private final int numMessages;
+ private final Duration subscriptionDelay;
+
private volatile Runnable finishedCallback;
TestConsumer(int numMessages) {
+ this(numMessages, null);
+ }
+
+ TestConsumer(int numMessages, Duration subscriptionDelay) {
this.numMessages = numMessages;
+ this.subscriptionDelay = subscriptionDelay;
}
private final List<MessageId> acknowledgedMessages = new CopyOnWriteArrayList<>();
@@ -496,7 +527,10 @@
@Override
public <R> Flux<R> consumeMany(Function<Flux<Message<String>>, Publisher<MessageResult<R>>> messageHandler) {
- return Flux.defer(() -> {
+ Flux<R> flux = Flux.deferContextual((contextView) -> {
+ Optional<InternalConsumerListener> internalConsumerListener = contextView
+ .getOrEmpty(InternalConsumerListener.class);
+ internalConsumerListener.ifPresent((listener) -> listener.onConsumerCreated(this));
Flux<Message<String>> messages = Flux.range(0, this.numMessages)
.map(Object::toString)
.map(TestMessage::new);
@@ -511,8 +545,15 @@
if (this.finishedCallback != null) {
this.finishedCallback.run();
}
+ internalConsumerListener.ifPresent((listener) -> listener.onConsumerClosed(this));
});
});
+ if (this.subscriptionDelay != null) {
+ return flux.delaySubscription(this.subscriptionDelay);
+ }
+ else {
+ return flux;
+ }
}
List<MessageId> getAcknowledgedMessages() {