Add untilStarted & untilStopped to ReactiveMessagePipeline (#214)

diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 05282e7..2fd4c57 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -52,6 +52,7 @@
 mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" }
 pulsar-client-api = { module = "org.apache.pulsar:pulsar-client-api", version.ref = "pulsar" }
 pulsar-client-shaded = { module = "org.apache.pulsar:pulsar-client", version.ref = "pulsar" }
+pulsar-client-all = { module = "org.apache.pulsar:pulsar-client-all", version.ref = "pulsar" }
 rat-gradle = { module = "org.nosphere.apache:creadur-rat-gradle", version.ref = "rat-gradle" }
 reactor-core = { module = "io.projectreactor:reactor-core", version.ref = "reactor" }
 reactor-test = { module = "io.projectreactor:reactor-test", version.ref = "reactor" }
diff --git a/pulsar-client-reactive-adapter/build.gradle b/pulsar-client-reactive-adapter/build.gradle
index 984775a..37e6774 100644
--- a/pulsar-client-reactive-adapter/build.gradle
+++ b/pulsar-client-reactive-adapter/build.gradle
@@ -35,6 +35,7 @@
 	testImplementation libs.bundles.log4j
 	testImplementation libs.mockito.core
 
+	intTestImplementation libs.pulsar.client.all
 	intTestImplementation project(':pulsar-client-reactive-producer-cache-caffeine')
 	intTestImplementation project(path: ':pulsar-client-reactive-producer-cache-caffeine-shaded', configuration: 'shadow')
 	intTestImplementation libs.junit.jupiter
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
index 6c8dada..a93988d 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/ReactiveMessagePipelineE2ETests.java
@@ -36,8 +36,11 @@
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.reactive.client.api.MessageSpec;
 import org.apache.pulsar.reactive.client.api.MessageSpecBuilder;
 import org.apache.pulsar.reactive.client.api.ReactiveMessagePipeline;
@@ -94,6 +97,38 @@
 		}
 	}
 
+	@Test
+	void shouldSupportWaitingForConsumingToStartAndStop() throws Exception {
+		try (PulsarClient pulsarClient = SingletonPulsarContainer.createPulsarClient();
+				PulsarAdmin pulsarAdmin = SingletonPulsarContainer.createPulsarAdmin()) {
+			String topicName = "test" + UUID.randomUUID();
+			ReactivePulsarClient reactivePulsarClient = AdaptedReactivePulsarClientFactory.create(pulsarClient);
+			ReactiveMessagePipeline pipeline = reactivePulsarClient.messageConsumer(Schema.STRING)
+				.subscriptionName("sub")
+				.topic(topicName)
+				.build()
+				.messagePipeline()
+				.messageHandler((message) -> Mono.empty())
+				.build()
+				.start();
+
+			// wait for consuming to start
+			pipeline.untilStarted().block(Duration.ofSeconds(5));
+			// there should be an existing subscription
+			List<String> subscriptions = pulsarAdmin.topics().getSubscriptions(topicName);
+			assertThat(subscriptions).as("subscription should be created").contains("sub");
+
+			// stop the pipeline
+			pipeline.stop();
+			// and wait for it to stop
+			pipeline.untilStopped().block(Duration.ofSeconds(5));
+			// there should be no consumers
+			TopicStats topicStats = pulsarAdmin.topics().getStats(topicName);
+			SubscriptionStats subStats = topicStats.getSubscriptions().get("sub");
+			assertThat(subStats.getConsumers()).isEmpty();
+		}
+	}
+
 	@ParameterizedTest
 	@EnumSource(MessageOrderScenario.class)
 	void shouldRetainMessageOrder(MessageOrderScenario messageOrderScenario) throws Exception {
diff --git a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
index 431e448..d53e185 100644
--- a/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
+++ b/pulsar-client-reactive-adapter/src/intTest/java/org/apache/pulsar/reactive/client/adapter/SingletonPulsarContainer.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.reactive.client.adapter;
 
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.testcontainers.containers.PulsarContainer;
@@ -44,6 +45,12 @@
 			.build();
 	}
 
+	static PulsarAdmin createPulsarAdmin() throws PulsarClientException {
+		return PulsarAdmin.builder()
+			.serviceHttpUrl(SingletonPulsarContainer.PULSAR_CONTAINER.getHttpServiceUrl())
+			.build();
+	}
+
 	static DockerImageName getPulsarImage() {
 		return DockerImageName.parse("apachepulsar/pulsar:4.0.4");
 	}
diff --git a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
index 8e89a09..e66ba28 100644
--- a/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
+++ b/pulsar-client-reactive-adapter/src/main/java/org/apache/pulsar/reactive/client/internal/adapter/ReactiveConsumerAdapter.java
@@ -25,6 +25,7 @@
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import reactor.core.publisher.Flux;
@@ -45,12 +46,20 @@
 	}
 
 	private Mono<Consumer<T>> createConsumerMono() {
-		return AdapterImplementationFactory.adaptPulsarFuture(
-				() -> this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync());
+		return Mono.deferContextual((contextView) -> AdapterImplementationFactory
+			.adaptPulsarFuture(
+					() -> this.consumerBuilderFactory.apply(this.pulsarClientSupplier.get()).subscribeAsync())
+			.doOnSuccess((consumer) -> contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
+				.ifPresent((listener) -> listener.onConsumerCreated(consumer))));
 	}
 
 	private Mono<Void> closeConsumer(Consumer<?> consumer) {
-		return Mono.fromFuture(consumer::closeAsync).doOnSuccess((__) -> this.LOG.info("Consumer closed {}", consumer));
+		return Mono.deferContextual((contextView) -> Mono.fromFuture(consumer::closeAsync).doFinally((signalType) -> {
+			this.LOG.info("Consumer closed {}", consumer);
+			contextView.<InternalConsumerListener>getOrEmpty(InternalConsumerListener.class)
+				.ifPresent((listener) -> listener.onConsumerClosed(consumer));
+		}));
+
 	}
 
 	<R> Mono<R> usingConsumer(Function<Consumer<T>, Mono<R>> usingConsumerAction) {
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
index 47804a5..9613293 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipeline.java
@@ -19,20 +19,26 @@
 
 package org.apache.pulsar.reactive.client.api;
 
+import reactor.core.publisher.Mono;
+
 /**
  * Reactive message pipeline interface.
  */
 public interface ReactiveMessagePipeline extends AutoCloseable {
 
 	/**
-	 * Starts the reactive pipeline.
-	 * @return the pipeline
+	 * Starts the reactive pipeline asynchronously.
+	 * @return the pipeline instance
+	 * @see #untilStarted() For returning a reactive publisher (Mono) that completes after
+	 * the pipeline has actually started.
 	 */
 	ReactiveMessagePipeline start();
 
 	/**
-	 * Stops the reactive pipeline.
-	 * @return the reactive pipeline
+	 * Stops the reactive pipeline asynchronously.
+	 * @return the pipeline instance
+	 * @see #untilStopped() For returning a reactive publisher (Mono) that completes after
+	 * the pipeline has actually stopped.
 	 */
 	ReactiveMessagePipeline stop();
 
@@ -43,11 +49,54 @@
 	boolean isRunning();
 
 	/**
-	 * Closes the reactive pipeline.
+	 * Closes the reactive pipeline asynchronously without waiting for shutdown
+	 * completion.
 	 * @throws Exception if an error occurs
 	 */
 	default void close() throws Exception {
 		stop();
 	}
 
+	/**
+	 * <p>
+	 * Returns a reactive publisher (Mono) that completes after the pipeline has
+	 * successfully subscribed to the input topic(s) and started consuming messages for
+	 * the first time after pipeline creation. This method is not intended to be used
+	 * after a pipeline restarts following failure. Use this method to wait for consumer
+	 * and Pulsar subscription creation. This helps avoid race conditions when sending
+	 * messages immediately after the pipeline starts.
+	 * </p>
+	 * <p>
+	 * The {@link #start()} method must be called before invoking this method.
+	 * </p>
+	 * <p>
+	 * To wait for the operation to complete synchronously, it is necessary to call
+	 * {@link Mono#block()} on the returned Mono.
+	 * </p>
+	 * @return a Mono that completes after the pipeline has created its underlying Pulsar
+	 * consumer
+	 */
+	default Mono<Void> untilStarted() {
+		return Mono.empty();
+	}
+
+	/**
+	 * <p>
+	 * Returns a reactive publisher (Mono) that completes after the pipeline has closed
+	 * the underlying Pulsar consumer and stopped consuming new messages.
+	 * </p>
+	 * <p>
+	 * The {@link #stop()} method must be called before invoking this method.
+	 * </p>
+	 * <p>
+	 * To wait for the operation to complete synchronously, it is necessary to call
+	 * {@link Mono#block()} on the returned Mono.
+	 * </p>
+	 * @return a Mono that completes when the pipeline has closed the underlying Pulsar
+	 * consumer
+	 */
+	default Mono<Void> untilStopped() {
+		return Mono.empty();
+	}
+
 }
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
index cd94f19..eefdcf2 100644
--- a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/DefaultReactiveMessagePipeline.java
@@ -21,6 +21,7 @@
 
 import java.time.Duration;
 import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
@@ -67,6 +68,10 @@
 
 	private final MessageGroupingFunction groupingFunction;
 
+	private final AtomicReference<InternalConsumerListenerImpl> consumerListener = new AtomicReference<>();
+
+	private final AtomicReference<CompletableFuture<Void>> pipelineStoppedFuture = new AtomicReference<>();
+
 	DefaultReactiveMessagePipeline(ReactiveMessageConsumer<T> messageConsumer,
 			Function<Message<T>, Publisher<Void>> messageHandler, BiConsumer<Message<T>, Throwable> errorLogger,
 			Retry pipelineRetrySpec, Duration handlingTimeout, Function<Mono<Void>, Publisher<Void>> transformer,
@@ -83,7 +88,14 @@
 		this.pipeline = messageConsumer.consumeMany(this::createMessageConsumer)
 			.then()
 			.transform(transformer)
-			.transform(this::decoratePipeline);
+			.transform(this::decoratePipeline)
+			.doFinally((signalType) -> {
+				CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
+				if (f != null) {
+					f.complete(null);
+				}
+			})
+			.doFirst(() -> this.pipelineStoppedFuture.set(new CompletableFuture<>()));
 	}
 
 	private Mono<Void> decorateMessageHandler(Mono<Void> messageHandler) {
@@ -168,14 +180,26 @@
 		if (this.killSwitch.get() != null) {
 			throw new IllegalStateException("Message handler is already running.");
 		}
-		Disposable disposable = this.pipeline.subscribe(null, this::logError, this::logUnexpectedCompletion);
+		InternalConsumerListenerImpl consumerListener = new InternalConsumerListenerImpl();
+		Disposable disposable = this.pipeline.contextWrite(Context.of(InternalConsumerListener.class, consumerListener))
+			.subscribe(null, this::logError, this::logUnexpectedCompletion);
 		if (!this.killSwitch.compareAndSet(null, disposable)) {
 			disposable.dispose();
 			throw new IllegalStateException("Message handler was already running.");
 		}
+		this.consumerListener.set(consumerListener);
 		return this;
 	}
 
+	@Override
+	public Mono<Void> untilStarted() {
+		if (!isRunning()) {
+			throw new IllegalStateException("Pipeline isn't running. Call start first.");
+		}
+		InternalConsumerListenerImpl internalConsumerListener = this.consumerListener.get();
+		return internalConsumerListener.waitForConsumerCreated();
+	}
+
 	private void logError(Throwable throwable) {
 		LOG.error("ReactiveMessageHandler was unexpectedly terminated.", throwable);
 	}
@@ -196,8 +220,43 @@
 	}
 
 	@Override
+	public Mono<Void> untilStopped() {
+		if (isRunning()) {
+			throw new IllegalStateException("Pipeline is running. Call stop first.");
+		}
+		CompletableFuture<Void> f = this.pipelineStoppedFuture.get();
+		if (f != null) {
+			return Mono.fromFuture(f, true);
+		}
+		else {
+			return Mono.empty();
+		}
+	}
+
+	@Override
 	public boolean isRunning() {
 		return this.killSwitch.get() != null;
 	}
 
+	private static final class InternalConsumerListenerImpl implements InternalConsumerListener {
+
+		private final CompletableFuture<Void> createdFuture;
+
+		private InternalConsumerListenerImpl() {
+			this.createdFuture = new CompletableFuture<>();
+		}
+
+		@Override
+		public void onConsumerCreated(Object nativeConsumer) {
+			if (!this.createdFuture.isDone()) {
+				this.createdFuture.complete(null);
+			}
+		}
+
+		Mono<Void> waitForConsumerCreated() {
+			return Mono.fromFuture(this.createdFuture, true);
+		}
+
+	}
+
 }
diff --git a/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
new file mode 100644
index 0000000..8ee8f4b
--- /dev/null
+++ b/pulsar-client-reactive-api/src/main/java/org/apache/pulsar/reactive/client/internal/api/InternalConsumerListener.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.reactive.client.internal.api;
+
+/**
+ * Internal interface to signal the creation and closing of a native consumer. This is not
+ * to be intended to be used by applications.
+ */
+public interface InternalConsumerListener {
+
+	/**
+	 * Called when a new native consumer is created. This is called each time a new
+	 * consumer is created initially or as a result of a reactive pipeline retry.
+	 * @param nativeConsumer the native consumer instance
+	 */
+	default void onConsumerCreated(Object nativeConsumer) {
+		// no-op
+	}
+
+	/**
+	 * Called when a native consumer is closed. This is called each time a consumer is
+	 * closed as a result of a reactive pipeline retry or when the pipeline is closed.
+	 * @param nativeConsumer the native consumer instance
+	 */
+	default void onConsumerClosed(Object nativeConsumer) {
+		// no-op
+	}
+
+}
diff --git a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
index 3415481..f330a54 100644
--- a/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
+++ b/pulsar-client-reactive-api/src/test/java/org/apache/pulsar/reactive/client/api/ReactiveMessagePipelineTests.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -38,6 +39,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.internal.DefaultImplementation;
 import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.reactive.client.internal.api.InternalConsumerListener;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -49,6 +51,7 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
 import static org.assertj.core.api.Assertions.assertThatNullPointerException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 class ReactiveMessagePipelineTests {
 
@@ -165,6 +168,27 @@
 	}
 
 	@Test
+	void pipelineUntilStartedAndStopped() throws Exception {
+		int numMessages = 10;
+		Duration subscriptionDelay = Duration.ofSeconds(1);
+		TestConsumer testConsumer = new TestConsumer(numMessages, subscriptionDelay);
+		CountDownLatch latch = new CountDownLatch(numMessages);
+		Function<Message<String>, Publisher<Void>> messageHandler = (
+				message) -> Mono.empty().then().doFinally((__) -> latch.countDown());
+		ReactiveMessagePipeline pipeline = testConsumer.messagePipeline().messageHandler(messageHandler).build();
+		pipeline.start();
+		// timeout should occur since subscription delay is 1 second in TestConsumer
+		assertThatThrownBy(() -> pipeline.untilStarted().block(Duration.ofMillis(100)))
+			.isInstanceOf(IllegalStateException.class)
+			.hasCauseInstanceOf(TimeoutException.class);
+		// now wait for consuming to start
+		pipeline.untilStarted().block(Duration.ofSeconds(2));
+		assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
+		// now wait for consuming to stop
+		pipeline.stop().untilStopped().block(Duration.ofSeconds(1));
+	}
+
+	@Test
 	void streamingHandler() throws Exception {
 		int numMessages = 10;
 		TestConsumer testConsumer = new TestConsumer(numMessages);
@@ -480,10 +504,17 @@
 
 		private final int numMessages;
 
+		private final Duration subscriptionDelay;
+
 		private volatile Runnable finishedCallback;
 
 		TestConsumer(int numMessages) {
+			this(numMessages, null);
+		}
+
+		TestConsumer(int numMessages, Duration subscriptionDelay) {
 			this.numMessages = numMessages;
+			this.subscriptionDelay = subscriptionDelay;
 		}
 
 		private final List<MessageId> acknowledgedMessages = new CopyOnWriteArrayList<>();
@@ -496,7 +527,10 @@
 
 		@Override
 		public <R> Flux<R> consumeMany(Function<Flux<Message<String>>, Publisher<MessageResult<R>>> messageHandler) {
-			return Flux.defer(() -> {
+			Flux<R> flux = Flux.deferContextual((contextView) -> {
+				Optional<InternalConsumerListener> internalConsumerListener = contextView
+					.getOrEmpty(InternalConsumerListener.class);
+				internalConsumerListener.ifPresent((listener) -> listener.onConsumerCreated(this));
 				Flux<Message<String>> messages = Flux.range(0, this.numMessages)
 					.map(Object::toString)
 					.map(TestMessage::new);
@@ -511,8 +545,15 @@
 					if (this.finishedCallback != null) {
 						this.finishedCallback.run();
 					}
+					internalConsumerListener.ifPresent((listener) -> listener.onConsumerClosed(this));
 				});
 			});
+			if (this.subscriptionDelay != null) {
+				return flux.delaySubscription(this.subscriptionDelay);
+			}
+			else {
+				return flux;
+			}
 		}
 
 		List<MessageId> getAcknowledgedMessages() {