Add tests to verify that Pulsar Exceptions are propagated as-is (#118)
diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTest.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTest.java
index 5fb6c74..1218ce4 100644
--- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTest.java
+++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageConsumerTest.java
@@ -39,6 +39,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -327,4 +328,30 @@
verify(consumer).acknowledgeAsync(eq(MessageId.latest));
}
+ @Test
+ void consumePulsarException() throws Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ Consumer<String> consumer = mock(Consumer.class);
+ doReturn(CompletableFuture.completedFuture(null)).when(consumer).closeAsync();
+
+ CompletableFuture<String> failedFuture = new CompletableFuture<>();
+ failedFuture.completeExceptionally(new PulsarClientException.InvalidMessageException("test"));
+ doReturn(failedFuture).when(consumer).receiveAsync();
+
+ doReturn(CompletableFuture.completedFuture(consumer)).when(pulsarClient)
+ .subscribeAsync(any(ConsumerConfigurationData.class), eq(Schema.STRING), isNull());
+
+ ReactiveMessageConsumer<String> reactiveConsumer = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+ .messageConsumer(Schema.STRING).topic("my-topic").subscriptionName("my-sub").build();
+
+ StepVerifier.create(reactiveConsumer.consumeOne((message) -> Mono.just(MessageResult.acknowledge(message))))
+ .verifyError(PulsarClientException.InvalidMessageException.class);
+
+ StepVerifier
+ .create(reactiveConsumer.consumeMany((messages) -> messages.map(MessageResult::acknowledgeAndReturn)))
+ .verifyError(PulsarClientException.InvalidMessageException.class);
+ }
+
}
diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTest.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTest.java
index c10b7fe..047fbd7 100644
--- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTest.java
+++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageReaderTest.java
@@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
@@ -129,6 +130,29 @@
}
@Test
+ void readPulsarException() throws Exception {
+ PulsarClientImpl pulsarClient = spy(
+ (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
+
+ Reader<String> reader = mock(Reader.class);
+ doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
+ doReturn(CompletableFuture.completedFuture(true)).when(reader).hasMessageAvailableAsync();
+
+ CompletableFuture<String> failedFuture = new CompletableFuture<>();
+ failedFuture.completeExceptionally(new PulsarClientException.InvalidMessageException("test"));
+ doReturn(failedFuture).when(reader).readNextAsync();
+
+ doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient).createReaderAsync(any(),
+ eq(Schema.STRING));
+
+ ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
+ .messageReader(Schema.STRING).topic("my-topic").build();
+
+ StepVerifier.create(reactiveReader.readOne()).verifyError(PulsarClientException.InvalidMessageException.class);
+ StepVerifier.create(reactiveReader.readMany()).verifyError(PulsarClientException.InvalidMessageException.class);
+ }
+
+ @Test
void endOfStreamPoll() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
diff --git a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
index f24c059..516abe3 100644
--- a/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
+++ b/pulsar-client-reactive-adapter/src/test/java/org/apache/pulsar/reactive/client/internal/adapter/AdaptedReactiveMessageSenderTest.java
@@ -69,8 +69,6 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.InOrder;
-import org.mockito.Mockito;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
@@ -169,7 +167,7 @@
}
@Test
- void sendOneErrorDoesntUseCorrelatedMessageSendingException() throws Exception {
+ void sendOnePulsarException() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
@@ -200,39 +198,6 @@
}
@Test
- void sendMany() throws Exception {
- PulsarClientImpl pulsarClient = spy(
- (PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
-
- ProducerBase<String> producer = mock(ProducerBase.class);
- doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
- TypedMessageBuilderImpl<String> typedMessageBuilder1 = spy(
- new TypedMessageBuilderImpl<>(producer, Schema.STRING));
- doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder1).sendAsync();
- TypedMessageBuilderImpl<String> typedMessageBuilder2 = spy(
- new TypedMessageBuilderImpl<>(producer, Schema.STRING));
- doReturn(CompletableFuture.completedFuture(MessageId.latest)).when(typedMessageBuilder2).sendAsync();
-
- doReturn(typedMessageBuilder1, typedMessageBuilder2).when(producer).newMessage();
- doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
- eq(Schema.STRING), isNull());
-
- ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
- .messageSender(Schema.STRING).topic("my-topic").build();
-
- Flux<MessageSpec<String>> messageSpecs = Flux.just(MessageSpec.of("test1"), MessageSpec.of("test2"));
- StepVerifier.create(reactiveSender.sendMany(messageSpecs).map(MessageSendResult::getMessageId))
- .expectNext(MessageId.earliest).expectNext(MessageId.latest).verifyComplete();
-
- verify(pulsarClient).createProducerAsync(any(), any(), isNull());
- InOrder inOrder = Mockito.inOrder(typedMessageBuilder1, typedMessageBuilder2);
- inOrder.verify(typedMessageBuilder1).value("test1");
- inOrder.verify(typedMessageBuilder1).sendAsync();
- inOrder.verify(typedMessageBuilder2).value("test2");
- inOrder.verify(typedMessageBuilder2).sendAsync();
- }
-
- @Test
void sendManyStopOnError() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
@@ -271,13 +236,15 @@
.assertNext((next) -> assertThat(next.getMessageId()).isEqualTo(messageIds.get(0)))
.verifyErrorSatisfies((throwable) -> assertThat(throwable)
.asInstanceOf(InstanceOfAssertFactories.type(ReactiveMessageSendingException.class))
- .satisfies((cme) -> assertThat(cme.getMessageSpec()).isSameAs(failingMessage))
- .satisfies((cme) -> assertThat((String) cme.getCorrelationMetadata()).isEqualTo("my-context"))
- .satisfies((cme) -> assertThat(cme.toString()).contains("correlation metadata={my-context}")));
+ .satisfies((rmse) -> assertThat(rmse.getCause()).isInstanceOf(ProducerQueueIsFullError.class))
+ .satisfies((rmse) -> assertThat(rmse.getMessageSpec()).isSameAs(failingMessage))
+ .satisfies((rmse) -> assertThat((String) rmse.getCorrelationMetadata()).isEqualTo("my-context"))
+ .satisfies(
+ (rmse) -> assertThat(rmse.toString()).contains("correlation metadata={my-context}")));
}
@Test
- void sendManyCorrelated() throws Exception {
+ void sendMany() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());