Add tests to verify that Pulsar Exceptions are propagated as-is (#118)

diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTest.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTest.java
index 5fb6c74..1218ce4 100644
--- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTest.java
+++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTest.java
@@ -39,6 +39,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -327,4 +328,30 @@
 		verify(consumer).acknowledgeAsync(eq(MessageId.latest));
 	}
 
+	@Test
+	void consumePulsarException() throws Exception {
+		PulsarClientImpl pulsarClient = spy(
+				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+
+		Consumer<String> consumer = mock(Consumer.class);
+		doReturn(CompletableFuture.completedFuture(null)).when(consumer).closeAsync();
+
+		CompletableFuture<String> failedFuture = new CompletableFuture<>();
+		failedFuture.completeExceptionally(new PulsarClientException.InvalidMessageException("test"));
+		doReturn(failedFuture).when(consumer).receiveAsync();
+
+		doReturn(CompletableFuture.completedFuture(consumer)).when(pulsarClient)
+				.subscribeAsync(any(ConsumerConfigurationData.class), eq(Schema.STRING), isNull());
+
+		ReactiveMessageConsumer<String> reactiveConsumer = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+				.messageConsumer(Schema.STRING).topic("my-topic").subscriptionName("my-sub").build();
+
+		StepVerifier.create(reactiveConsumer.consumeOne((message) -> Mono.just(MessageResult.acknowledge(message))))
+				.verifyError(PulsarClientException.InvalidMessageException.class);
+
+		StepVerifier
+				.create(reactiveConsumer.consumeMany((messages) -> messages.map(MessageResult::acknowledgeAndReturn)))
+				.verifyError(PulsarClientException.InvalidMessageException.class);
+	}
+
 }
diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTest.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTest.java
index c10b7fe..047fbd7 100644
--- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTest.java
+++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTest.java
@@ -29,6 +29,7 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
@@ -129,6 +130,29 @@
 	}
 
 	@Test
+	void readPulsarException() throws Exception {
+		PulsarClientImpl pulsarClient = spy(
+				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+
+		Reader<String> reader = mock(Reader.class);
+		doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
+		doReturn(CompletableFuture.completedFuture(true)).when(reader).hasMessageAvailableAsync();
+
+		CompletableFuture<String> failedFuture = new CompletableFuture<>();
+		failedFuture.completeExceptionally(new PulsarClientException.InvalidMessageException("test"));
+		doReturn(failedFuture).when(reader).readNextAsync();
+
+		doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient).createReaderAsync(any(),
+				eq(Schema.STRING));
+
+		ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+				.messageReader(Schema.STRING).topic("my-topic").build();
+
+		StepVerifier.create(reactiveReader.readOne()).verifyError(PulsarClientException.InvalidMessageException.class);
+		StepVerifier.create(reactiveReader.readMany()).verifyError(PulsarClientException.InvalidMessageException.class);
+	}
+
+	@Test
 	void endOfStreamPoll() throws Exception {
 		PulsarClientImpl pulsarClient = spy(
 				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index f24c059..516abe3 100644
--- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -69,8 +69,6 @@
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
 import reactor.core.publisher.Flux;
 import reactor.test.StepVerifier;
 
@@ -169,7 +167,7 @@
 	}
 
 	@Test
-	void sendOneErrorDoesntUseCorrelatedMessageSendingException() throws Exception {
+	void sendOnePulsarException() throws Exception {
 		PulsarClientImpl pulsarClient = spy(
 				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
 
@@ -200,39 +198,6 @@
 	}
 
 	@Test
-	void sendMany() throws Exception {
-		PulsarClientImpl pulsarClient = spy(
-				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
-
-		ProducerBase<String> producer = mock(ProducerBase.class);
-		doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
-		TypedMessageBuilderImpl<String> typedMessageBuilder1 = spy(
-				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
-		doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder1).sendAsync();
-		TypedMessageBuilderImpl<String> typedMessageBuilder2 = spy(
-				new TypedMessageBuilderImpl<>(producer, Schema.STRING));
-		doReturn(CompletableFuture.completedFuture(MessageId.latest)).when(typedMessageBuilder2).sendAsync();
-
-		doReturn(typedMessageBuilder1, typedMessageBuilder2).when(producer).newMessage();
-		doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
-				eq(Schema.STRING), isNull());
-
-		ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
-				.messageSender(Schema.STRING).topic("my-topic").build();
-
-		Flux<MessageSpec<String>> messageSpecs = Flux.just(MessageSpec.of("test1"), MessageSpec.of("test2"));
-		StepVerifier.create(reactiveSender.sendMany(messageSpecs).map(MessageSendResult::getMessageId))
-				.expectNext(MessageId.earliest).expectNext(MessageId.latest).verifyComplete();
-
-		verify(pulsarClient).createProducerAsync(any(), any(), isNull());
-		InOrder inOrder = Mockito.inOrder(typedMessageBuilder1, typedMessageBuilder2);
-		inOrder.verify(typedMessageBuilder1).value("test1");
-		inOrder.verify(typedMessageBuilder1).sendAsync();
-		inOrder.verify(typedMessageBuilder2).value("test2");
-		inOrder.verify(typedMessageBuilder2).sendAsync();
-	}
-
-	@Test
 	void sendManyStopOnError() throws Exception {
 		PulsarClientImpl pulsarClient = spy(
 				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
@@ -271,13 +236,15 @@
 				.assertNext((next) -> assertThat(next.getMessageId()).isEqualTo(messageIds.get(0)))
 				.verifyErrorSatisfies((throwable) -> assertThat(throwable)
 						.asInstanceOf(InstanceOfAssertFactories.type(ReactiveMessageSendingException.class))
-						.satisfies((cme) -> assertThat(cme.getMessageSpec()).isSameAs(failingMessage))
-						.satisfies((cme) -> assertThat((String) cme.getCorrelationMetadata()).isEqualTo("my-context"))
-						.satisfies((cme) -> assertThat(cme.toString()).contains("correlation metadata={my-context}")));
+						.satisfies((rmse) -> assertThat(rmse.getCause()).isInstanceOf(ProducerQueueIsFullError.class))
+						.satisfies((rmse) -> assertThat(rmse.getMessageSpec()).isSameAs(failingMessage))
+						.satisfies((rmse) -> assertThat((String) rmse.getCorrelationMetadata()).isEqualTo("my-context"))
+						.satisfies(
+								(rmse) -> assertThat(rmse.toString()).contains("correlation metadata={my-context}")));
 	}
 
 	@Test
-	void sendManyCorrelated() throws Exception {
+	void sendMany() throws Exception {
 		PulsarClientImpl pulsarClient = spy(
 				(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());