blob: c3a8b267f766243fd773617bb25cef19b1198f0d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.reactive.client.internal.adapter;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.reactive.client.api.MessageSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSender;
import org.apache.pulsar.reactive.client.api.ReactiveMessageSenderSpec;
import org.apache.pulsar.reactive.client.internal.api.InternalMessageSpec;
import org.apache.pulsar.reactive.client.internal.api.PublisherTransformer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class AdaptedReactiveMessageSender<T> implements ReactiveMessageSender<T> {
private final Schema<T> schema;
private final ReactiveMessageSenderSpec senderSpec;
private final int maxConcurrency;
private final ReactiveProducerAdapterFactory reactiveProducerAdapterFactory;
private final ProducerCache producerCache;
private final Supplier<PublisherTransformer> producerActionTransformer;
private final Object producerActionTransformerKey;
AdaptedReactiveMessageSender(Schema<T> schema, ReactiveMessageSenderSpec senderSpec, int maxConcurrency,
ReactiveProducerAdapterFactory reactiveProducerAdapterFactory, ProducerCache producerCache,
Supplier<PublisherTransformer> producerActionTransformer, Object producerActionTransformerKey) {
this.schema = schema;
this.senderSpec = senderSpec;
this.maxConcurrency = maxConcurrency;
this.reactiveProducerAdapterFactory = reactiveProducerAdapterFactory;
this.producerCache = producerCache;
this.producerActionTransformer = producerActionTransformer;
this.producerActionTransformerKey = producerActionTransformerKey;
}
ReactiveProducerAdapter<T> createReactiveProducerAdapter() {
return this.reactiveProducerAdapterFactory.create((pulsarClient) -> {
ProducerBuilder<T> producerBuilder = pulsarClient.newProducer(this.schema);
configureProducerBuilder(producerBuilder);
return producerBuilder;
}, this.producerCache, this.producerActionTransformer, this.producerActionTransformerKey);
}
private void configureProducerBuilder(ProducerBuilder<T> producerBuilder) {
if (this.senderSpec.getTopicName() != null) {
producerBuilder.topic(this.senderSpec.getTopicName());
}
if (this.senderSpec.getProducerName() != null) {
producerBuilder.producerName(this.senderSpec.getProducerName());
}
if (this.senderSpec.getSendTimeout() != null) {
producerBuilder.sendTimeout((int) (this.senderSpec.getSendTimeout().toMillis() / 1000L), TimeUnit.SECONDS);
}
if (this.senderSpec.getMaxPendingMessages() != null) {
producerBuilder.maxPendingMessages(this.senderSpec.getMaxPendingMessages());
}
if (this.senderSpec.getMaxPendingMessagesAcrossPartitions() != null) {
producerBuilder.maxPendingMessagesAcrossPartitions(this.senderSpec.getMaxPendingMessagesAcrossPartitions());
}
if (this.senderSpec.getMessageRoutingMode() != null) {
producerBuilder.messageRoutingMode(this.senderSpec.getMessageRoutingMode());
}
if (this.senderSpec.getHashingScheme() != null) {
producerBuilder.hashingScheme(this.senderSpec.getHashingScheme());
}
if (this.senderSpec.getCryptoFailureAction() != null) {
producerBuilder.cryptoFailureAction(this.senderSpec.getCryptoFailureAction());
}
if (this.senderSpec.getMessageRouter() != null) {
producerBuilder.messageRouter(this.senderSpec.getMessageRouter());
}
if (this.senderSpec.getBatchingMaxPublishDelay() != null) {
producerBuilder.batchingMaxPublishDelay(this.senderSpec.getBatchingMaxPublishDelay().toNanos(),
TimeUnit.NANOSECONDS);
}
if (this.senderSpec.getRoundRobinRouterBatchingPartitionSwitchFrequency() != null) {
producerBuilder.roundRobinRouterBatchingPartitionSwitchFrequency(
this.senderSpec.getRoundRobinRouterBatchingPartitionSwitchFrequency());
}
if (this.senderSpec.getBatchingMaxMessages() != null) {
producerBuilder.batchingMaxMessages(this.senderSpec.getBatchingMaxMessages());
}
if (this.senderSpec.getBatchingMaxBytes() != null) {
producerBuilder.batchingMaxBytes(this.senderSpec.getBatchingMaxBytes());
}
if (this.senderSpec.getBatchingEnabled() != null) {
producerBuilder.enableBatching(this.senderSpec.getBatchingEnabled());
}
if (this.senderSpec.getBatcherBuilder() != null) {
producerBuilder.batcherBuilder(this.senderSpec.getBatcherBuilder());
}
if (this.senderSpec.getChunkingEnabled() != null) {
producerBuilder.enableChunking(this.senderSpec.getChunkingEnabled());
}
if (this.senderSpec.getCryptoKeyReader() != null) {
producerBuilder.cryptoKeyReader(this.senderSpec.getCryptoKeyReader());
}
if (this.senderSpec.getEncryptionKeys() != null && !this.senderSpec.getEncryptionKeys().isEmpty()) {
this.senderSpec.getEncryptionKeys().forEach(producerBuilder::addEncryptionKey);
}
if (this.senderSpec.getCompressionType() != null) {
producerBuilder.compressionType(this.senderSpec.getCompressionType());
}
if (this.senderSpec.getInitialSequenceId() != null) {
producerBuilder.initialSequenceId(this.senderSpec.getInitialSequenceId());
}
if (this.senderSpec.getAutoUpdatePartitions() != null) {
producerBuilder.autoUpdatePartitions(this.senderSpec.getAutoUpdatePartitions());
}
if (this.senderSpec.getAutoUpdatePartitionsInterval() != null) {
producerBuilder.autoUpdatePartitionsInterval(
(int) (this.senderSpec.getAutoUpdatePartitionsInterval().toMillis() / 1000L), TimeUnit.SECONDS);
}
if (this.senderSpec.getMultiSchema() != null) {
producerBuilder.enableMultiSchema(this.senderSpec.getMultiSchema());
}
if (this.senderSpec.getAccessMode() != null) {
producerBuilder.accessMode(this.senderSpec.getAccessMode());
}
if (this.senderSpec.getLazyStartPartitionedProducers() != null) {
producerBuilder.enableLazyStartPartitionedProducers(this.senderSpec.getLazyStartPartitionedProducers());
}
if (this.senderSpec.getProperties() != null && !this.senderSpec.getProperties().isEmpty()) {
producerBuilder
.properties(Collections.unmodifiableMap(new LinkedHashMap<>(this.senderSpec.getProperties())));
}
}
@Override
public Mono<MessageId> sendOne(MessageSpec<T> messageSpec) {
return createReactiveProducerAdapter()
.usingProducer((producer, transformer) -> createMessageMono(messageSpec, producer, transformer));
}
private Mono<MessageId> createMessageMono(MessageSpec<T> messageSpec, Producer<T> producer,
PublisherTransformer transformer) {
return PulsarFutureAdapter.adaptPulsarFuture(() -> {
TypedMessageBuilder<T> typedMessageBuilder = producer.newMessage();
((InternalMessageSpec<T>) messageSpec).configure(typedMessageBuilder);
return typedMessageBuilder.sendAsync();
}).transform(transformer::transform);
}
@Override
public Flux<MessageId> sendMany(Publisher<MessageSpec<T>> messageSpecs) {
return createReactiveProducerAdapter()
.usingProducerMany((producer, transformer) -> Flux.from(messageSpecs).flatMapSequential(
(messageSpec) -> createMessageMono(messageSpec, producer, transformer), this.maxConcurrency));
}
}