blob: 516abe336488877ed9c9e39fc94efd482d915f3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.reactive.client.internal.adapter;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.reactive.client.adapter.AdaptedReactivePulsarClientFactory;
import org.apache.pulsar.reactive.client.api.MessageSendResult;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderBuilder;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderCache;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSendingException;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* Tests for {@link AdaptedReactiveMessageSender}.
*/
class AdaptedReactiveMessageSenderTest {
@Test
void sendOne() throws Exception {
MessageRouter messageRouter = new MessageRouter() {
};
BatcherBuilder batcherBuilder = () -> null;
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
ProducerBase<String> producer = mock(ProducerBase.class);
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
doReturn(typedMessageBuilder).when(producer).newMessage();
CryptoKeyReader cryptoKeyReader = mock(CryptoKeyReader.class);
ProducerConfigurationData expectedProducerConf = new ProducerConfigurationData();
expectedProducerConf.setTopicName("my-topic");
expectedProducerConf.setProducerName("my-producer");
expectedProducerConf.setSendTimeoutMs(TimeUnit.SECONDS.toMillis(1));
expectedProducerConf.setMaxPendingMessages(2);
expectedProducerConf.setMaxPendingMessagesAcrossPartitions(3);
expectedProducerConf.setMessageRoutingMode(MessageRoutingMode.CustomPartition);
expectedProducerConf.setHashingScheme(HashingScheme.Murmur3_32Hash);
expectedProducerConf.setCryptoFailureAction(ProducerCryptoFailureAction.SEND);
expectedProducerConf.setCustomMessageRouter(messageRouter);
expectedProducerConf.setBatchingMaxPublishDelayMicros(TimeUnit.SECONDS.toMicros(4));
expectedProducerConf.setBatchingPartitionSwitchFrequencyByPublishDelay(5);
expectedProducerConf.setBatchingMaxMessages(6);
expectedProducerConf.setBatchingMaxBytes(7);
expectedProducerConf.setBatchingEnabled(false);
expectedProducerConf.setBatcherBuilder(batcherBuilder);
expectedProducerConf.setChunkingEnabled(true);
expectedProducerConf.setCryptoKeyReader(cryptoKeyReader);
expectedProducerConf.setEncryptionKeys(Collections.singleton("my-key"));
expectedProducerConf.setCompressionType(CompressionType.LZ4);
expectedProducerConf.setInitialSequenceId(8L);
expectedProducerConf.setAutoUpdatePartitions(true);
expectedProducerConf.setAutoUpdatePartitionsIntervalSeconds(9);
expectedProducerConf.setMultiSchema(true);
expectedProducerConf.setAccessMode(ProducerAccessMode.Exclusive);
expectedProducerConf.setLazyStartPartitionedProducers(true);
SortedMap<String, String> properties = new TreeMap<>();
properties.put("my-key", "my-value");
expectedProducerConf.setProperties(properties);
CompletableFuture<String> failedProducer = new CompletableFuture<>();
failedProducer.completeExceptionally(new RuntimeException("didn't match expected producer conf"));
doReturn(failedProducer).when(pulsarClient).createProducerAsync(any(), eq(Schema.STRING), isNull());
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient)
.createProducerAsync(eq(expectedProducerConf), eq(Schema.STRING), isNull());
ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageSender(Schema.STRING).topic("my-topic").producerName("my-producer")
.sendTimeout(Duration.ofSeconds(1)).maxPendingMessages(2).maxPendingMessagesAcrossPartitions(3)
.messageRoutingMode(MessageRoutingMode.CustomPartition).hashingScheme(HashingScheme.Murmur3_32Hash)
.cryptoFailureAction(ProducerCryptoFailureAction.SEND).messageRouter(messageRouter)
.batchingMaxPublishDelay(Duration.ofSeconds(4)).roundRobinRouterBatchingPartitionSwitchFrequency(5)
.batchingMaxMessages(6).batchingMaxBytes(7).batchingEnabled(false).batcherBuilder(batcherBuilder)
.chunkingEnabled(true).cryptoKeyReader(cryptoKeyReader).encryptionKeys(Collections.singleton("my-key"))
.compressionType(CompressionType.LZ4).initialSequenceId(8).autoUpdatePartitions(true)
.autoUpdatePartitionsInterval(Duration.ofSeconds(9)).multiSchema(true)
.accessMode(ProducerAccessMode.Exclusive).lazyStartPartitionedProducers(true)
.property("my-key", "my-value").clone().build();
MessageSpec<String> messageSpec = spy(MessageSpec.of("test"));
MessageId messageId1 = reactiveSender.sendOne(messageSpec).block(Duration.ofSeconds(5));
verify(pulsarClient).createProducerAsync(any(), any(), isNull());
verify((InternalMessageSpec<String>) messageSpec).configure(typedMessageBuilder);
assertThat(messageId1).isEqualTo(MessageId.earliest);
}
@Test
void sendOnePulsarException() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
ProducerBase<String> producer = mock(ProducerBase.class);
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
given(producer.newMessage()).willAnswer((__) -> {
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
CompletableFuture<MessageId> failed = new CompletableFuture<>();
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
return failed;
});
return typedMessageBuilder;
});
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
eq(Schema.STRING), isNull());
ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageSender(Schema.STRING).topic("my-topic").build();
StepVerifier.create(reactiveSender.sendOne(MessageSpec.of("test1")))
// the original exception should be returned without wrapping it in
// ReactiveMessageSendingException
.expectError(ProducerQueueIsFullError.class).verify();
}
@Test
void sendManyStopOnError() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
ProducerBase<String> producer = mock(ProducerBase.class);
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
AtomicInteger entryId = new AtomicInteger();
List<MessageId> messageIds = new CopyOnWriteArrayList<>();
given(producer.newMessage()).willAnswer((__) -> {
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
if (entryId.get() == 1) {
CompletableFuture<MessageId> failed = new CompletableFuture<>();
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
return failed;
}
MessageId messageId = DefaultImplementation.getDefaultImplementation().newMessageId(1,
entryId.incrementAndGet(), 1);
messageIds.add(messageId);
return CompletableFuture.completedFuture(messageId);
});
return typedMessageBuilder;
});
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
eq(Schema.STRING), isNull());
ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageSender(Schema.STRING).topic("my-topic").stopOnError(true).build();
MessageSpec<String> failingMessage = MessageSpec.builder("test2").correlationMetadata("my-context").build();
Flux<MessageSpec<String>> messageSpecs = Flux.just(MessageSpec.of("test1"), failingMessage);
StepVerifier.create(reactiveSender.sendMany(messageSpecs))
.assertNext((next) -> assertThat(next.getMessageId()).isEqualTo(messageIds.get(0)))
.verifyErrorSatisfies((throwable) -> assertThat(throwable)
.asInstanceOf(InstanceOfAssertFactories.type(ReactiveMessageSendingException.class))
.satisfies((rmse) -> assertThat(rmse.getCause()).isInstanceOf(ProducerQueueIsFullError.class))
.satisfies((rmse) -> assertThat(rmse.getMessageSpec()).isSameAs(failingMessage))
.satisfies((rmse) -> assertThat((String) rmse.getCorrelationMetadata()).isEqualTo("my-context"))
.satisfies(
(rmse) -> assertThat(rmse.toString()).contains("correlation metadata={my-context}")));
}
@Test
void sendMany() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
ProducerBase<String> producer = mock(ProducerBase.class);
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
AtomicInteger entryId = new AtomicInteger();
List<MessageId> messageIds = new CopyOnWriteArrayList<>();
given(producer.newMessage()).willAnswer((__) -> {
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
if (entryId.get() == 2) {
CompletableFuture<MessageId> failed = new CompletableFuture<>();
failed.completeExceptionally(new ProducerQueueIsFullError("Queue is full"));
return failed;
}
MessageId messageId = DefaultImplementation.getDefaultImplementation().newMessageId(1,
entryId.incrementAndGet(), 1);
messageIds.add(messageId);
return CompletableFuture.completedFuture(messageId);
});
return typedMessageBuilder;
});
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
eq(Schema.STRING), isNull());
ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageSender(Schema.STRING).topic("my-topic").build();
MessageSpec<String> messageSpec1 = MessageSpec.of("test1");
Flux<MessageSpec<String>> keysAndMessageSpecs = Flux.just(messageSpec1,
MessageSpec.builder("test2").correlationMetadata(456).build(),
MessageSpec.builder("test3").correlationMetadata(789).build());
StepVerifier.create(reactiveSender.sendMany(keysAndMessageSpecs)).assertNext((next) -> {
assertThat(next.getMessageId()).isEqualTo(messageIds.get(0));
assertThat(next.getMessageSpec()).isEqualTo(messageSpec1);
assertThat(next.getException()).isNull();
}).assertNext((next) -> {
assertThat(next.getMessageId()).isEqualTo(messageIds.get(1));
assertThat((int) next.getCorrelationMetadata()).isEqualTo(456);
assertThat(next.getException()).isNull();
}).assertNext((next) -> {
assertThat(next.getMessageId()).isNull();
assertThat((int) next.getCorrelationMetadata()).isEqualTo(789);
assertThat(next.getException()).isInstanceOf(ProducerQueueIsFullError.class);
}).verifyComplete();
}
@ParameterizedTest
@MethodSource
void senderCache(String name, ReactiveMessageSenderCache cache) throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
ProducerBase<String> producer = mock(ProducerBase.class);
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
doReturn(true).when(producer).isConnected();
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
doReturn(typedMessageBuilder).when(producer).newMessage();
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
eq(Schema.STRING), isNull());
ProducerBase<Integer> producer2 = mock(ProducerBase.class);
doReturn(CompletableFuture.completedFuture(null)).when(producer2).closeAsync();
doReturn(CompletableFuture.completedFuture(null)).when(producer2).flushAsync();
doReturn(true).when(producer2).isConnected();
TypedMessageBuilderImpl<Integer> typedMessageBuilder2 = spy(
new TypedMessageBuilderImpl<>(producer2, Schema.INT32));
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder2).sendAsync();
doReturn(typedMessageBuilder2).when(producer2).newMessage();
doReturn(CompletableFuture.completedFuture(producer2)).when(pulsarClient).createProducerAsync(any(),
eq(Schema.INT32), isNull());
// Sender without cache
createSenderAndSendMessages(pulsarClient, Schema.STRING, "my-topic", null, new String[] { "a", "b", "c" });
verify(pulsarClient, times(3)).createProducerAsync(any(), any(), isNull());
// Sender with cache
createSenderAndSendMessages(pulsarClient, Schema.STRING, "my-topic", cache, new String[] { "a", "b", "c" });
verify(pulsarClient, times(4)).createProducerAsync(any(), any(), isNull());
// Other sender wih same cache, same Schema, same Producer config
createSenderAndSendMessages(pulsarClient, Schema.STRING, "my-topic", cache, new String[] { "d", "e", "f" });
verify(pulsarClient, times(4)).createProducerAsync(any(), any(), isNull());
// Other sender wih same cache, same Schema, different Producer config
createSenderAndSendMessages(pulsarClient, Schema.STRING, "my-other-topic", cache,
new String[] { "a", "b", "c" });
verify(pulsarClient, times(5)).createProducerAsync(any(), any(), isNull());
// Other sender wih same cache, different Schema, same Producer config
createSenderAndSendMessages(pulsarClient, Schema.INT32, "my-topic", cache, new Integer[] { 42, 43, 44 });
verify(pulsarClient, times(6)).createProducerAsync(any(), any(), isNull());
}
private static Stream<Arguments> senderCache() {
return Arrays.asList(
Arguments.of("ConcurrentHashMapProducerCacheProvider",
AdaptedReactivePulsarClientFactory.createCache(new ConcurrentHashMapProducerCacheProvider())),
Arguments.of("Default", AdaptedReactivePulsarClientFactory.createCache())).stream();
}
private static <T> void createSenderAndSendMessages(PulsarClient client, Schema<T> schema, String topic,
ReactiveMessageSenderCache cache, T[] values) {
assertThat(values).hasSize(3);
ReactiveMessageSenderBuilder<T> builder = AdaptedReactivePulsarClientFactory.create(client)
.messageSender(schema).topic(topic);
if (cache != null) {
builder.cache(cache);
}
ReactiveMessageSender<T> sender = builder.build();
sender.sendOne(MessageSpec.of(values[0])).then(sender.sendOne(MessageSpec.of(values[1])))
.thenMany(Flux.just(MessageSpec.of(values[2])).as(sender::sendMany)).blockLast(Duration.ofSeconds(5));
}
@Test
void senderCacheEntryRecreatedIfProducerClosed() throws Exception {
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
ProducerBase<String> producer = mock(ProducerBase.class);
doReturn(CompletableFuture.completedFuture(null)).when(producer).closeAsync();
doReturn(CompletableFuture.completedFuture(null)).when(producer).flushAsync();
doReturn(true).when(producer).isConnected();
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
doReturn(CompletableFuture.completedFuture(MessageId.earliest)).when(typedMessageBuilder).sendAsync();
doReturn(typedMessageBuilder).when(producer).newMessage();
doReturn(CompletableFuture.completedFuture(producer)).when(pulsarClient).createProducerAsync(any(),
eq(Schema.STRING), isNull());
ReactiveMessageSenderCache cache = AdaptedReactivePulsarClientFactory.createCache();
createSenderAndSendMessages(pulsarClient, Schema.STRING, "my-topic", cache, new String[] { "a", "b", "c" });
ReactiveMessageSender<String> sender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageSender(Schema.STRING).topic("my-topic").cache(cache).build();
sender.sendOne(MessageSpec.of("a")).block(Duration.ofSeconds(5));
sender.sendOne(MessageSpec.of("b")).block(Duration.ofSeconds(5));
verify(pulsarClient).createProducerAsync(any(), any(), isNull());
// Disconnect the producer and send a new message
doReturn(false).when(producer).isConnected();
CompletableFuture<MessageId> messageIdFuture = sender.sendOne(MessageSpec.of("c")).toFuture();
Thread.sleep(100);
// Check that the disconnected producer is flushed and closed and a new producer
// is created
verify(producer).closeAsync();
verify(producer).flushAsync();
verify(pulsarClient, times(2)).createProducerAsync(any(), any(), isNull());
// Check that we wait for the producer to be reconnected
Thread.sleep(1000);
assertThat(messageIdFuture.isDone()).isFalse();
// Re-connect the producer
doReturn(true).when(producer).isConnected();
messageIdFuture.get(5, TimeUnit.SECONDS);
verify(pulsarClient, times(2)).createProducerAsync(any(), any(), isNull());
sender.sendOne(MessageSpec.of("d")).block(Duration.ofSeconds(5));
verify(pulsarClient, times(2)).createProducerAsync(any(), any(), isNull());
// Verify that an error is emitted if the producer doesn't reconnect in time
doReturn(false).when(producer).isConnected();
Duration reconnectTimeout = StepVerifier.create(sender.sendOne(MessageSpec.of("c")))
.verifyError(IllegalStateException.class);
assertThat(reconnectTimeout).isBetween(Duration.ofSeconds(4), Duration.ofSeconds(5));
}
@ParameterizedTest
@CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
void maxInFlightUsingSendOne(int maxInflight, int maxElements) throws Exception {
doTestMaxInFlight(
(reactiveSender, inputFlux) -> inputFlux
.flatMap((i) -> reactiveSender.sendOne(MessageSpec.of(String.valueOf(i))), 100),
maxInflight, maxElements);
}
@ParameterizedTest
@CsvSource({ "7,100", "13,100", "37,500", "51,1000" })
void maxInFlightUsingSendMany(int maxInflight, int maxElements) throws Exception {
doTestMaxInFlight(
(reactiveSender, inputFlux) -> inputFlux.window(3)
.flatMap((subFlux) -> subFlux.map((i) -> MessageSpec.of(String.valueOf(i)))
.as(reactiveSender::sendMany).map(MessageSendResult::getMessageId), 100),
maxInflight, maxElements);
}
void doTestMaxInFlight(BiFunction<ReactiveMessageSender<String>, Flux<Integer>, Flux<MessageId>> sendingFunction,
int maxInflight, int maxElements) throws Exception {
ScheduledExecutorService executorService = null;
try {
executorService = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService finalExecutorService = executorService;
PulsarClientImpl pulsarClient = spy(
(PulsarClientImpl) PulsarClient.builder().serviceUrl("http://dummy").build());
AtomicLong totalRequests = new AtomicLong();
AtomicLong requestsMax = new AtomicLong();
ProducerBase<String> producer = mock(ProducerBase.class);
given(producer.closeAsync()).willReturn(CompletableFuture.completedFuture(null));
given(producer.isConnected()).willReturn(true);
given(producer.newMessage()).willAnswer((__) -> {
TypedMessageBuilderImpl<String> typedMessageBuilder = spy(
new TypedMessageBuilderImpl<>(producer, Schema.STRING));
given(typedMessageBuilder.sendAsync()).willAnswer((___) -> {
CompletableFuture<MessageId> messageSender = new CompletableFuture<>();
finalExecutorService.execute(() -> {
long current = totalRequests.incrementAndGet();
requestsMax.accumulateAndGet(current, Math::max);
});
finalExecutorService.schedule(() -> {
totalRequests.decrementAndGet();
// encode integer in message value to entry id in message id
int encodedEntryId = Integer.parseInt(typedMessageBuilder.getMessage().getValue());
messageSender.complete(
DefaultImplementation.getDefaultImplementation().newMessageId(1, encodedEntryId, 1));
}, 100, TimeUnit.MILLISECONDS);
return messageSender;
});
return typedMessageBuilder;
});
given(pulsarClient.createProducerAsync(any(), eq(Schema.STRING), isNull()))
.willReturn(CompletableFuture.completedFuture(producer));
ReactiveMessageSender<String> reactiveSender = AdaptedReactivePulsarClientFactory.create(pulsarClient)
.messageSender(Schema.STRING).maxInflight(maxInflight)
.cache(AdaptedReactivePulsarClientFactory.createCache()).maxConcurrentSenderSubscriptions(1024)
.topic("my-topic").build();
List<Integer> inputValues = IntStream.rangeClosed(1, maxElements).boxed().collect(Collectors.toList());
Flux<Integer> inputFlux = Flux.fromIterable(inputValues);
Flux<MessageId> outputFlux = sendingFunction.apply(reactiveSender, inputFlux);
// get message value from encoded entry id in message id
List<Integer> outputValues = outputFlux.map((m) -> (int) ((MessageIdImpl) m).getEntryId()).collectList()
.block();
assertThat(outputValues).containsExactlyInAnyOrderElementsOf(inputValues);
assertThat(requestsMax.get()).isEqualTo(maxInflight);
}
finally {
if (executorService != null) {
executorService.shutdownNow();
}
}
}
}