blob: 047fbd7cd861b16ede1fe5a82dbed1840ce43cbc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.reactive.client.internal.adapter;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.EndOfStreamAction;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReader;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Tests for {@link AdaptedReactiveMessageReader}.
*/
class AdaptedReactiveMessageReaderTest {
@Test
void readOne() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
Reader<String> reader = mock(Reader.class);
doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
doReturn(CompletableFuture.completedFuture(true)).when(reader).hasMessageAvailableAsync();
Message<String> message = mock(Message.class);
doReturn(CompletableFuture.completedFuture(message)).when(reader).readNextAsync();
CryptoKeyReader cryptoKeyReader = mock(CryptoKeyReader.class);
ReaderConfigurationData<String> expectedReaderConf = new ReaderConfigurationData<>();
expectedReaderConf.setTopicNames(Collections.singleton("my-topic"));
expectedReaderConf.setReaderName("my-reader");
expectedReaderConf.setSubscriptionName("my-sub");
expectedReaderConf.setSubscriptionRolePrefix("my-prefix-");
expectedReaderConf.setStartMessageId(MessageId.latest);
expectedReaderConf.setResetIncludeHead(true);
expectedReaderConf.setReceiverQueueSize(1);
expectedReaderConf.setReadCompacted(true);
expectedReaderConf.setCryptoKeyReader(cryptoKeyReader);
expectedReaderConf.setKeyHashRanges(Collections.singletonList(new Range(2, 3)));
expectedReaderConf.setCryptoFailureAction(ConsumerCryptoFailureAction.DISCARD);
CompletableFuture<String> failedReader = new CompletableFuture<>();
failedReader.completeExceptionally(new RuntimeException("didn't match expected reader conf"));
doReturn(failedReader).when(pulsarClient).createReaderAsync(any(), eq(Schema.STRING));
doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient).createReaderAsync(eq(expectedReaderConf),
eq(Schema.STRING));
ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageReader(Schema.STRING).topic("my-topic").readerName("my-reader").subscriptionName("my-sub")
.generatedSubscriptionNamePrefix("my-prefix-").startAtSpec(StartAtSpec.ofLatestInclusive())
.receiverQueueSize(1).readCompacted(true).cryptoKeyReader(cryptoKeyReader)
.keyHashRanges(Collections.singletonList(new Range(2, 3)))
.cryptoFailureAction(ConsumerCryptoFailureAction.DISCARD).clone().build();
Message<String> messageRead = reactiveReader.readOne().block(Duration.ofSeconds(5));
verify(pulsarClient).createReaderAsync(any(), any());
assertThat(messageRead).isSameAs(message);
}
@Test
void readMany() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
Reader<String> reader = mock(Reader.class);
doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
doReturn(CompletableFuture.completedFuture(true), CompletableFuture.completedFuture(true),
CompletableFuture.completedFuture(false)).when(reader).hasMessageAvailableAsync();
Message<String> message1 = mock(Message.class);
Message<String> message2 = mock(Message.class);
doReturn(CompletableFuture.completedFuture(message1), CompletableFuture.completedFuture(message2),
new CompletableFuture<String>()).when(reader).readNextAsync();
doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient).createReaderAsync(any(),
eq(Schema.STRING));
ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageReader(Schema.STRING).topic("my-topic").build();
StepVerifier.create(reactiveReader.readMany()).expectNext(message1).expectNext(message2).verifyComplete();
verify(reader, times(2)).readNextAsync();
}
@Test
void readPulsarException() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
Reader<String> reader = mock(Reader.class);
doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
doReturn(CompletableFuture.completedFuture(true)).when(reader).hasMessageAvailableAsync();
CompletableFuture<String> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new PulsarClientException.InvalidMessageException("test"));
doReturn(failedFuture).when(reader).readNextAsync();
doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient).createReaderAsync(any(),
eq(Schema.STRING));
ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageReader(Schema.STRING).topic("my-topic").build();
StepVerifier.create(reactiveReader.readOne()).verifyError(PulsarClientException.InvalidMessageException.class);
StepVerifier.create(reactiveReader.readMany()).verifyError(PulsarClientException.InvalidMessageException.class);
}
@Test
void endOfStreamPoll() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
Reader<String> reader = mock(Reader.class);
doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
doReturn(CompletableFuture.completedFuture(true), CompletableFuture.completedFuture(true),
CompletableFuture.completedFuture(false)).when(reader).hasMessageAvailableAsync();
Message<String> message1 = mock(Message.class);
Message<String> message2 = mock(Message.class);
doReturn(CompletableFuture.completedFuture(message1), CompletableFuture.completedFuture(message2),
new CompletableFuture<String>()).when(reader).readNextAsync();
doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient).createReaderAsync(any(),
eq(Schema.STRING));
ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageReader(Schema.STRING).topic("my-topic").endOfStreamAction(EndOfStreamAction.POLL).build();
StepVerifier.create(reactiveReader.readMany().timeout(Duration.ofMillis(100))).expectNext(message1)
.expectNext(message2).verifyError();
verify(reader, times(3)).readNextAsync();
}
@Test
void startAtInstant() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
Reader<String> reader = mock(Reader.class);
doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
doReturn(CompletableFuture.completedFuture(true)).when(reader).hasMessageAvailableAsync();
Message<String> message = mock(Message.class);
doReturn(CompletableFuture.completedFuture(message)).when(reader).readNextAsync();
ReaderConfigurationData<String> expectedReaderConf = new ReaderConfigurationData<>();
expectedReaderConf.setTopicNames(Collections.singleton("my-topic"));
expectedReaderConf.setStartMessageId(MessageId.earliest);
expectedReaderConf.setStartMessageFromRollbackDurationInSec(6);
CompletableFuture<String> failedReader = new CompletableFuture<>();
failedReader.completeExceptionally(new RuntimeException("didn't match expected reader conf"));
doReturn(failedReader).when(pulsarClient).createReaderAsync(any(), eq(Schema.STRING));
doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient).createReaderAsync(eq(expectedReaderConf),
eq(Schema.STRING));
Instant now = Instant.now();
ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageReader(Schema.STRING).topic("my-topic").startAtSpec(StartAtSpec.ofInstant(now.minusSeconds(5)))
.build();
try (MockedStatic<Instant> mockedStatic = mockStatic(Instant.class)) {
mockedStatic.when(Instant::now).thenReturn(now);
mockedStatic.when(() -> Instant.from(any())).thenCallRealMethod();
Message<String> messageRead = reactiveReader.readOne().block(Duration.ofSeconds(5));
verify(pulsarClient).createReaderAsync(any(), any());
assertThat(messageRead).isSameAs(message);
}
}
@Test
void startAtInstantInTheFutureFails() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
Reader<String> reader = mock(Reader.class);
doReturn(CompletableFuture.completedFuture(null)).when(reader).closeAsync();
doReturn(CompletableFuture.completedFuture(true)).when(reader).hasMessageAvailableAsync();
Message<String> message = mock(Message.class);
doReturn(CompletableFuture.completedFuture(message)).when(reader).readNextAsync();
doReturn(CompletableFuture.completedFuture(reader)).when(pulsarClient).createReaderAsync(any(),
eq(Schema.STRING));
ReactiveMessageReader<String> reactiveReader = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageReader(Schema.STRING).topic("my-topic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().plusSeconds(5))).build();
StepVerifier.create(reactiveReader.readOne()).verifyError(IllegalArgumentException.class);
}
}