blob: b1de10e73b76f3ba76d6114aedca76e8573fe30c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;
import com.google.common.base.MoreObjects;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SubscribeRateLimiter {
private final String topicName;
private final BrokerService brokerService;
private ConcurrentHashMap<ConsumerIdentifier, AsyncTokenBucket> subscribeRateLimiter;
private SubscribeRate subscribeRate;
public SubscribeRateLimiter(PersistentTopic topic) {
this.topicName = topic.getName();
this.brokerService = topic.getBrokerService();
subscribeRateLimiter = new ConcurrentHashMap<>();
// get subscribeRate from topic level policies
this.subscribeRate = topic.getSubscribeRate();
if (isSubscribeRateEnabled(this.subscribeRate)) {
log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
}
}
/**
* returns available subscribes if subscribe-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableSubscribeRateLimit(ConsumerIdentifier consumerIdentifier) {
return subscribeRateLimiter.get(consumerIdentifier)
== null ? -1 : subscribeRateLimiter.get(consumerIdentifier).getTokens();
}
/**
* It acquires subscribe from subscribe-limiter and returns if acquired permits succeed.
*
* @return
*/
public synchronized boolean tryAcquire(ConsumerIdentifier consumerIdentifier) {
addSubscribeLimiterIfAbsent(consumerIdentifier);
AsyncTokenBucket tokenBucket = subscribeRateLimiter.get(consumerIdentifier);
if (tokenBucket == null) {
return true;
}
if (!tokenBucket.containsTokens(true)) {
return false;
}
tokenBucket.consumeTokens(1);
return true;
}
/**
* checks if subscribe-rate limit is configured and if it's configured then check if
* subscribe are available or not.
*
* @return
*/
public boolean subscribeAvailable(ConsumerIdentifier consumerIdentifier) {
return (subscribeRateLimiter.get(consumerIdentifier)
== null || subscribeRateLimiter.get(consumerIdentifier).containsTokens());
}
/**
* Update subscribe-throttling-rate. gives first priority to
* namespace-policy configured subscribe rate else applies
* default broker subscribe-throttling-rate
*/
private synchronized void addSubscribeLimiterIfAbsent(ConsumerIdentifier consumerIdentifier) {
if (subscribeRateLimiter.get(consumerIdentifier) != null || !isSubscribeRateEnabled(this.subscribeRate)) {
return;
}
updateSubscribeRate(consumerIdentifier, this.subscribeRate);
}
private synchronized void removeSubscribeLimiter(ConsumerIdentifier consumerIdentifier) {
this.subscribeRateLimiter.remove(consumerIdentifier);
}
/**
* Update subscribe rate by updating rate-limiter. If subscribe-rate is configured < 0 then it closes
* the rate-limiter and disables appropriate rate-limiter.
*
* @param subscribeRate
*/
private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentifier, SubscribeRate subscribeRate) {
long ratePerConsumer = subscribeRate.subscribeThrottlingRatePerConsumer;
long ratePeriodNanos = TimeUnit.SECONDS.toNanos(Math.max(subscribeRate.ratePeriodInSecond, 1));
// update subscribe-rateLimiter
if (ratePerConsumer > 0) {
AsyncTokenBucket tokenBucket =
AsyncTokenBucket.builder().rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
this.subscribeRateLimiter.put(consumerIdentifier, tokenBucket);
} else {
// subscribe-rate should be disable and close
removeSubscribeLimiter(consumerIdentifier);
}
}
public void onSubscribeRateUpdate(SubscribeRate subscribeRate) {
if (this.subscribeRate.equals(subscribeRate)) {
return;
}
this.subscribeRate = subscribeRate;
for (ConsumerIdentifier consumerIdentifier : this.subscribeRateLimiter.keySet()) {
if (!isSubscribeRateEnabled(this.subscribeRate)) {
removeSubscribeLimiter(consumerIdentifier);
} else {
updateSubscribeRate(consumerIdentifier, subscribeRate);
}
}
if (isSubscribeRateEnabled(this.subscribeRate)) {
log.info("[{}] configured subscribe-dispatch rate at broker {}", this.topicName, subscribeRate);
}
}
/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call.
*/
@Deprecated
public SubscribeRate getPoliciesSubscribeRate() {
return getPoliciesSubscribeRate(brokerService, topicName);
}
/**
* @deprecated Avoid using the deprecated method
* #{@link org.apache.pulsar.broker.resources.NamespaceResources#getPoliciesIfCached(NamespaceName)} and blocking
* call.
*/
@Deprecated
public static SubscribeRate getPoliciesSubscribeRate(BrokerService brokerService, final String topicName) {
final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
final Optional<Policies> policies = DispatchRateLimiter.getPolicies(brokerService, topicName);
return getPoliciesSubscribeRate(cluster, policies, topicName);
}
public static SubscribeRate getPoliciesSubscribeRate(final String cluster, final Optional<Policies> policies,
String topicName) {
// return policy-subscribe rate only if it's enabled in policies
return policies.map(p -> {
if (p.clusterSubscribeRate != null) {
SubscribeRate subscribeRate = p.clusterSubscribeRate.get(cluster);
return isSubscribeRateEnabled(subscribeRate) ? subscribeRate : null;
} else {
return null;
}
}).orElse(null);
}
/**
* Get configured msg subscribe-throttling rate. Returns -1 if not configured
*
* @return
*/
public long getSubscribeRatePerConsumer(ConsumerIdentifier consumerIdentifier) {
return subscribeRateLimiter.get(consumerIdentifier)
!= null ? subscribeRateLimiter.get(consumerIdentifier).getRate() : -1;
}
public static boolean isSubscribeRateEnabled(SubscribeRate subscribeRate) {
return subscribeRate.subscribeThrottlingRatePerConsumer > 0 && subscribeRate.ratePeriodInSecond > 0;
}
public void close() {
}
public SubscribeRate getSubscribeRate() {
return subscribeRate;
}
public static class ConsumerIdentifier {
private String host;
private String consumerName;
private long consumerId;
public ConsumerIdentifier(String host, String consumerName, long consumerId) {
this.host = host;
this.consumerName = consumerName;
this.consumerId = consumerId;
}
@Override
public int hashCode() {
return Objects.hash(host, consumerName, consumerId);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ConsumerIdentifier) {
ConsumerIdentifier consumer = (ConsumerIdentifier) obj;
return Objects.equals(host, consumer.host)
&& Objects.equals(consumerName, consumer.consumerName)
&& Objects.equals(consumerId, consumer.consumerId);
}
return false;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("host", host)
.add("consumerName", consumerName)
.add("consumerId", consumerId).toString();
}
}
private static final Logger log = LoggerFactory.getLogger(SubscribeRateLimiter.class);
}