blob: 0385c56e4935f176bd91e9409d1d3243b6ddffce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.reactive.client.internal.adapter;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.EndOfStreamAction;
import org.apache.pulsar.reactive.client.api.InstantStartAtSpec;
import org.apache.pulsar.reactive.client.api.MessageIdStartAtSpec;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReader;
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderSpec;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
class AdaptedReactiveMessageReader<T> implements ReactiveMessageReader<T> {
private final Schema<T> schema;
private final ReactiveMessageReaderSpec readerSpec;
private final ReactiveReaderAdapterFactory reactiveReaderAdapterFactory;
private final StartAtSpec startAtSpec;
private final EndOfStreamAction endOfStreamAction;
AdaptedReactiveMessageReader(ReactiveReaderAdapterFactory reactiveReaderAdapterFactory, Schema<T> schema,
ReactiveMessageReaderSpec readerSpec, StartAtSpec startAtSpec, EndOfStreamAction endOfStreamAction) {
this.schema = schema;
this.readerSpec = readerSpec;
this.reactiveReaderAdapterFactory = reactiveReaderAdapterFactory;
this.startAtSpec = startAtSpec;
this.endOfStreamAction = endOfStreamAction;
}
static <T> Mono<Message<T>> readNextMessage(Reader<T> reader, EndOfStreamAction endOfStreamAction) {
Mono<Message<T>> messageMono = PulsarFutureAdapter.adaptPulsarFuture(reader::readNextAsync);
if (endOfStreamAction == EndOfStreamAction.COMPLETE) {
return PulsarFutureAdapter.adaptPulsarFuture(reader::hasMessageAvailableAsync).filter(Boolean::booleanValue)
.flatMap((__) -> messageMono);
}
else {
return messageMono;
}
}
ReactiveReaderAdapter<T> createReactiveReaderAdapter(StartAtSpec startAtSpec) {
return this.reactiveReaderAdapterFactory.create(readerStartingAt(startAtSpec));
}
private Function<PulsarClient, ReaderBuilder<T>> readerStartingAt(StartAtSpec startAtSpec) {
return (pulsarClient) -> {
ReaderBuilder<T> readerBuilder = pulsarClient.newReader(this.schema);
if (startAtSpec != null) {
if (startAtSpec instanceof MessageIdStartAtSpec) {
MessageIdStartAtSpec messageIdStartAtSpec = (MessageIdStartAtSpec) startAtSpec;
readerBuilder.startMessageId(messageIdStartAtSpec.getMessageId());
if (messageIdStartAtSpec.isInclusive()) {
readerBuilder.startMessageIdInclusive();
}
}
else {
InstantStartAtSpec instantStartAtSpec = (InstantStartAtSpec) startAtSpec;
long rollbackDuration = ChronoUnit.SECONDS.between(instantStartAtSpec.getInstant(), Instant.now())
+ 1L;
if (rollbackDuration < 0L) {
throw new IllegalArgumentException("InstantStartAtSpec must be in the past.");
}
readerBuilder.startMessageFromRollbackDuration(rollbackDuration, TimeUnit.SECONDS);
}
}
configureReaderBuilder(readerBuilder);
return readerBuilder;
};
}
private void configureReaderBuilder(ReaderBuilder<T> readerBuilder) {
readerBuilder.topics(this.readerSpec.getTopicNames());
if (this.readerSpec.getReaderName() != null) {
readerBuilder.readerName(this.readerSpec.getReaderName());
}
if (this.readerSpec.getSubscriptionName() != null) {
readerBuilder.subscriptionName(this.readerSpec.getSubscriptionName());
}
if (this.readerSpec.getGeneratedSubscriptionNamePrefix() != null) {
readerBuilder.subscriptionRolePrefix(this.readerSpec.getGeneratedSubscriptionNamePrefix());
}
if (this.readerSpec.getReceiverQueueSize() != null) {
readerBuilder.receiverQueueSize(this.readerSpec.getReceiverQueueSize());
}
if (this.readerSpec.getReadCompacted() != null) {
readerBuilder.readCompacted(this.readerSpec.getReadCompacted());
}
if (this.readerSpec.getKeyHashRanges() != null && !this.readerSpec.getKeyHashRanges().isEmpty()) {
readerBuilder.keyHashRange(this.readerSpec.getKeyHashRanges().toArray(new Range[0]));
}
if (this.readerSpec.getCryptoKeyReader() != null) {
readerBuilder.cryptoKeyReader(this.readerSpec.getCryptoKeyReader());
}
if (this.readerSpec.getCryptoFailureAction() != null) {
readerBuilder.cryptoFailureAction(this.readerSpec.getCryptoFailureAction());
}
}
@Override
public Mono<Message<T>> readOne() {
return createReactiveReaderAdapter(this.startAtSpec)
.usingReader((reader) -> readNextMessage(reader, this.endOfStreamAction));
}
@Override
public Flux<Message<T>> readMany() {
return createReactiveReaderAdapter(this.startAtSpec).usingReaderMany((reader) -> {
Mono<Message<T>> messageMono = readNextMessage(reader, this.endOfStreamAction);
if (this.endOfStreamAction == EndOfStreamAction.COMPLETE) {
return messageMono.repeatWhen((flux) -> flux.takeWhile((emitted) -> emitted > 0L));
}
return messageMono.repeat();
});
}
}