blob: abbdce216bc7dc428618c2d191a011a310ee00ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.reactive.client.internal.api;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.jctools.queues.MpmcArrayQueue;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
/**
* Transformer class that limits the number of reactive streams subscription requests to
* keep the number of pending messages under a defined limit.
*
* Subscribing to upstream is postponed if the max inflight limit is reached since
* subscribing to a CompletableFuture is eager. The CompetableFuture will be created at
* subscription time and this demand cannot be controlled with Reactive Stream's requests.
* The solution for this is to acquire one slot at subscription time and return this slot
* when request is made to the subscription. Since it's not possible to backpressure the
* subscription requests, there's a configurable limit for the total number of pending
* subscriptions. Exceeding the limit will cause IllegalStateException at runtime.
*/
public class InflightLimiter implements PublisherTransformer {
/** Default limit for pending Reactive Stream subscriptions. */
public static final int DEFAULT_MAX_PENDING_SUBSCRIPTIONS = 1024;
private final MpmcArrayQueue<InflightLimiterSubscriber<?>> pendingSubscriptions;
private final AtomicInteger inflight = new AtomicInteger();
private final AtomicInteger activeSubscriptions = new AtomicInteger();
private final int maxInflight;
private final int expectedSubscriptionsInflight;
private final Scheduler.Worker triggerNextWorker;
private final AtomicBoolean triggerNextTriggered = new AtomicBoolean(false);
/**
* Constructs an InflightLimiter with a maximum number of in-flight messages.
* @param maxInflight the maximum number of in-flight messages
*/
public InflightLimiter(int maxInflight) {
this(maxInflight, 0, Schedulers.single(), DEFAULT_MAX_PENDING_SUBSCRIPTIONS);
}
/**
* Constructs an InflightLimiter.
* @param maxInflight the maximum number of in-flight messages
* @param expectedSubscriptionsInflight the expected number of in-flight
* subscriptions. Will limit the per-subscription requests to maxInflight / max(active
* subscriptions, expectedSubscriptionsInflight). Set to 0 to use active subscriptions
* in the calculation.
* @param triggerNextScheduler the scheduler on which it will be checked if the
* subscriber can request more
* @param maxPendingSubscriptions the maximum number of pending subscriptions
*/
public InflightLimiter(int maxInflight, int expectedSubscriptionsInflight, Scheduler triggerNextScheduler,
int maxPendingSubscriptions) {
if (maxInflight < 1) {
throw new IllegalArgumentException("maxInflight must be greater than 0");
}
this.maxInflight = maxInflight;
this.expectedSubscriptionsInflight = expectedSubscriptionsInflight;
this.triggerNextWorker = triggerNextScheduler.createWorker();
if (expectedSubscriptionsInflight > maxInflight) {
throw new IllegalArgumentException("maxSubscriptionInflight must be equal or less than maxInflight.");
}
this.pendingSubscriptions = new MpmcArrayQueue<>(maxPendingSubscriptions);
}
@Override
public <T> Publisher<T> transform(Publisher<T> publisher) {
if (publisher instanceof Mono<?>) {
return createOperator((Mono<T>) publisher);
}
else {
return createOperator(Flux.from(publisher));
}
}
<I> Flux<I> createOperator(Flux<I> source) {
return new FluxOperator<I, I>(source) {
@Override
public void subscribe(CoreSubscriber<? super I> actual) {
handleSubscribe(this.source, actual);
}
};
}
<I> Mono<I> createOperator(Mono<I> source) {
return new MonoOperator<I, I>(source) {
@Override
public void subscribe(CoreSubscriber<? super I> actual) {
handleSubscribe(this.source, actual);
}
};
}
<I> void handleSubscribe(Publisher<I> source, CoreSubscriber<? super I> actual) {
this.activeSubscriptions.incrementAndGet();
InflightLimiterSubscriber<I> subscriber = new InflightLimiterSubscriber<>(actual, source);
actual.onSubscribe(subscriber.getSubscription());
}
void maybeTriggerNext() {
if (!this.triggerNextWorker.isDisposed() && this.inflight.get() < this.maxInflight
&& !this.pendingSubscriptions.isEmpty()) {
if (this.triggerNextTriggered.compareAndSet(false, true)) {
this.triggerNextWorker.schedule(() -> {
this.triggerNextTriggered.set(false);
int remainingSubscriptions = this.pendingSubscriptions.size();
while (this.inflight.get() < this.maxInflight && remainingSubscriptions-- > 0) {
InflightLimiterSubscriber<?> subscriber = this.pendingSubscriptions.poll();
if (subscriber != null) {
if (!subscriber.isDisposed()) {
subscriber.requestMore();
}
}
else {
break;
}
}
});
}
}
}
void scheduleSubscribed(InflightLimiterSubscriber<?> subscriber) {
if (!this.triggerNextWorker.isDisposed()) {
this.triggerNextWorker.schedule(() -> {
if (!subscriber.isDisposed()) {
subscriber.requestMore();
}
});
}
}
@Override
public void dispose() {
this.triggerNextWorker.dispose();
this.pendingSubscriptions.drain(InflightLimiterSubscriber::cancel);
}
@Override
public boolean isDisposed() {
return this.triggerNextWorker.isDisposed();
}
private enum InflightLimiterSubscriberState {
INITIAL, SUBSCRIBING, SUBSCRIBED, REQUESTING
}
private class InflightLimiterSubscriber<I> extends BaseSubscriber<I> {
private final CoreSubscriber<? super I> actual;
private final Publisher<I> source;
private final AtomicLong requestedDemand = new AtomicLong();
private final AtomicReference<InflightLimiterSubscriberState> state = new AtomicReference<>(
InflightLimiterSubscriberState.INITIAL);
private final AtomicInteger inflightForSubscription = new AtomicInteger();
private final Subscription subscription = new Subscription() {
@Override
public void request(long n) {
if (n == Long.MAX_VALUE) {
InflightLimiterSubscriber.this.requestedDemand.set(n);
}
else if (InflightLimiterSubscriber.this.requestedDemand.get() != Long.MAX_VALUE) {
InflightLimiterSubscriber.this.requestedDemand.addAndGet(n);
}
maybeAddToPending();
maybeTriggerNext();
}
@Override
public void cancel() {
InflightLimiterSubscriber.this.cancel();
}
};
InflightLimiterSubscriber(CoreSubscriber<? super I> actual, Publisher<I> source) {
this.actual = actual;
this.source = source;
}
@Override
public Context currentContext() {
return this.actual.currentContext();
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
if (this.state.compareAndSet(InflightLimiterSubscriberState.SUBSCRIBING,
InflightLimiterSubscriberState.SUBSCRIBED)) {
scheduleSubscribed(this);
}
}
@Override
protected void hookOnNext(I value) {
this.actual.onNext(value);
InflightLimiter.this.inflight.decrementAndGet();
this.inflightForSubscription.decrementAndGet();
maybeAddToPending();
maybeTriggerNext();
}
@Override
protected void hookOnComplete() {
InflightLimiter.this.activeSubscriptions.decrementAndGet();
this.actual.onComplete();
clearInflight();
maybeTriggerNext();
}
private void clearInflight() {
InflightLimiter.this.inflight.addAndGet(-this.inflightForSubscription.getAndSet(0));
}
@Override
protected void hookOnError(Throwable throwable) {
InflightLimiter.this.activeSubscriptions.decrementAndGet();
this.actual.onError(throwable);
clearInflight();
maybeTriggerNext();
}
@Override
protected void hookOnCancel() {
InflightLimiter.this.activeSubscriptions.decrementAndGet();
clearInflight();
this.requestedDemand.set(0);
maybeTriggerNext();
}
Subscription getSubscription() {
return this.subscription;
}
void requestMore() {
// spread requests evenly across active subscriptions (or expected number of
// subscriptions)
int maxInflightForSubscription = Math
.max(InflightLimiter.this.maxInflight / Math.max(InflightLimiter.this.activeSubscriptions.get(),
InflightLimiter.this.expectedSubscriptionsInflight), 1);
if (this.requestedDemand.get() > 0 && (this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED
|| (this.inflightForSubscription.get() < maxInflightForSubscription
&& InflightLimiter.this.inflight.get() < InflightLimiter.this.maxInflight))) {
if (this.state.compareAndSet(InflightLimiterSubscriberState.INITIAL,
InflightLimiterSubscriberState.SUBSCRIBING)) {
// consume one slot for the subscription, since the first element
// might already be in flight
// when a CompletableFuture is mapped to a Mono
InflightLimiter.this.inflight.incrementAndGet();
this.inflightForSubscription.incrementAndGet();
this.source.subscribe(InflightLimiterSubscriber.this);
}
else if (this.state.get() == InflightLimiterSubscriberState.REQUESTING
|| this.state.get() == InflightLimiterSubscriberState.SUBSCRIBED) {
// subscribing changed the values, so adjust back the values on first
// call
if (this.state.compareAndSet(InflightLimiterSubscriberState.SUBSCRIBED,
InflightLimiterSubscriberState.REQUESTING)) {
// reverse the slot reservation made when transitioning from
// INITIAL to SUBSCRIBING
InflightLimiter.this.inflight.decrementAndGet();
this.inflightForSubscription.decrementAndGet();
}
long maxRequest = Math.max(Math.min(this.requestedDemand.get(),
maxInflightForSubscription - this.inflightForSubscription.get()), 1);
InflightLimiter.this.inflight.addAndGet((int) maxRequest);
this.requestedDemand.addAndGet(-maxRequest);
this.inflightForSubscription.addAndGet((int) maxRequest);
request(maxRequest);
}
}
else {
maybeAddToPending();
}
}
void maybeAddToPending() {
if (this.requestedDemand.get() > 0 && !isDisposed() && this.inflightForSubscription.get() == 0) {
InflightLimiter.this.pendingSubscriptions.add(this);
}
}
}
}