blob: 528f2d8ce02df393e7fbfbc66c2b96feace7d4e2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
import org.apache.pulsar.common.util.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DispatchRateLimiter {
public enum Type {
TOPIC,
SUBSCRIPTION,
REPLICATOR,
BROKER
}
private final PersistentTopic topic;
private final String topicName;
private final Type type;
private final BrokerService brokerService;
private RateLimiter dispatchRateLimiterOnMessage;
private RateLimiter dispatchRateLimiterOnByte;
public DispatchRateLimiter(PersistentTopic topic, Type type) {
this.topic = topic;
this.topicName = topic.getName();
this.brokerService = topic.getBrokerService();
this.type = type;
updateDispatchRate();
}
public DispatchRateLimiter(BrokerService brokerService) {
this.topic = null;
this.topicName = null;
this.brokerService = brokerService;
this.type = Type.BROKER;
updateDispatchRate();
}
/**
* returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableDispatchRateLimitOnMsg() {
return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits();
}
/**
* returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableDispatchRateLimitOnByte() {
return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits();
}
/**
* It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
* @param msgPermits
* @param bytePermits
* @return
*/
public boolean tryDispatchPermit(long msgPermits, long bytePermits) {
boolean acquiredMsgPermit = msgPermits <= 0 || dispatchRateLimiterOnMessage == null
// acquiring permits must be < configured msg-rate;
|| dispatchRateLimiterOnMessage.tryAcquire(msgPermits);
boolean acquiredBytePermit = bytePermits <= 0 || dispatchRateLimiterOnByte == null
// acquiring permits must be < configured msg-rate;
|| dispatchRateLimiterOnByte.tryAcquire(bytePermits);
return acquiredMsgPermit && acquiredBytePermit;
}
/**
* checks if dispatch-rate limit is configured and if it's configured then check if permits are available or not.
*
* @return
*/
public boolean hasMessageDispatchPermit() {
return (dispatchRateLimiterOnMessage == null || dispatchRateLimiterOnMessage.getAvailablePermits() > 0)
&& (dispatchRateLimiterOnByte == null || dispatchRateLimiterOnByte.getAvailablePermits() > 0);
}
/**
* Checks if dispatch-rate limiting is enabled.
*
* @return
*/
public boolean isDispatchRateLimitingEnabled() {
return dispatchRateLimiterOnMessage != null || dispatchRateLimiterOnByte != null;
}
/**
* createDispatchRate according to broker service config.
*
* @return
*/
private DispatchRate createDispatchRate() {
int dispatchThrottlingRateInMsg;
long dispatchThrottlingRateInByte;
ServiceConfiguration config = brokerService.pulsar().getConfiguration();
switch (type) {
case TOPIC:
dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerTopicInMsg();
dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerTopicInByte();
break;
case SUBSCRIPTION:
dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerSubscriptionInMsg();
dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerSubscriptionInByte();
break;
case REPLICATOR:
dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerReplicatorInMsg();
dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerReplicatorInByte();
break;
case BROKER:
dispatchThrottlingRateInMsg = config.getDispatchThrottlingRateInMsg();
dispatchThrottlingRateInByte = config.getDispatchThrottlingRateInByte();
break;
default:
dispatchThrottlingRateInMsg = -1;
dispatchThrottlingRateInByte = -1;
}
return DispatchRate.builder()
.dispatchThrottlingRateInMsg(dispatchThrottlingRateInMsg)
.dispatchThrottlingRateInByte(dispatchThrottlingRateInByte)
.ratePeriodInSecond(1)
.relativeToPublishRate(type != Type.BROKER && config.isDispatchThrottlingRateRelativeToPublishRate())
.build();
}
/**
* Update dispatch-throttling-rate.
* Topic-level has the highest priority, then namespace-level, and finally use dispatch-throttling-rate in
* broker-level
*/
public void updateDispatchRate() {
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (!dispatchRate.isPresent()) {
getPoliciesDispatchRateAsync(brokerService).thenAccept(dispatchRateOp -> {
if (!dispatchRateOp.isPresent()) {
dispatchRateOp = Optional.of(createDispatchRate());
}
updateDispatchRate(dispatchRateOp.get());
if (type == Type.BROKER) {
log.info("configured broker message-dispatch rate {}", dispatchRate.get());
} else {
log.info("[{}] configured {} message-dispatch rate at broker {}",
this.topicName, type, dispatchRate.get());
}
}).exceptionally(ex -> {
log.error("[{}] failed to get the dispatch rate policy from the namespace resource for type {}",
topicName, type, ex);
return null;
});
} else {
updateDispatchRate(dispatchRate.get());
log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate.get());
}
}
public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
String topicName, Type type) {
final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
if (type == Type.BROKER) {
return brokerService.getBrokerDispatchRateLimiter().isDispatchRateLimitingEnabled();
}
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (dispatchRate.isPresent()) {
return true;
}
policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName);
return isDispatchRateNeeded(serviceConfig, policies, topicName, type);
}
public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService,
String topicName, Type type) {
Optional<DispatchRate> dispatchRate = Optional.empty();
final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) {
try {
switch (type) {
case TOPIC:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getDispatchRate);
break;
case SUBSCRIPTION:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getSubscriptionDispatchRate);
break;
case REPLICATOR:
dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getReplicatorDispatchRate);
break;
default:
break;
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName);
} catch (Exception e) {
log.debug("[{}] Failed to get topic dispatch rate. ", topicName, e);
}
}
return dispatchRate;
}
public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig,
final Optional<Policies> policies, final String topicName, final Type type) {
DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type);
if (dispatchRate == null) {
switch (type) {
case TOPIC:
return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0
|| serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0;
case SUBSCRIPTION:
return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0
|| serviceConfig.getDispatchThrottlingRatePerSubscriptionInByte() > 0;
case REPLICATOR:
return serviceConfig.getDispatchThrottlingRatePerReplicatorInMsg() > 0
|| serviceConfig.getDispatchThrottlingRatePerReplicatorInByte() > 0;
default:
log.error("error DispatchRateLimiter type: {} ", type);
return false;
}
}
return true;
}
@SuppressWarnings("deprecation")
public void onPoliciesUpdate(Policies data) {
String cluster = brokerService.pulsar().getConfiguration().getClusterName();
DispatchRate dispatchRate;
switch (type) {
case TOPIC:
dispatchRate = data.topicDispatchRate.get(cluster);
if (dispatchRate == null) {
dispatchRate = data.clusterDispatchRate.get(cluster);
}
break;
case SUBSCRIPTION:
dispatchRate = data.subscriptionDispatchRate.get(cluster);
break;
case REPLICATOR:
dispatchRate = data.replicatorDispatchRate.get(cluster);
break;
default:
log.error("error DispatchRateLimiter type: {} ", type);
dispatchRate = null;
}
// update dispatch-rate only if it's configured in policies else ignore
if (dispatchRate != null) {
final DispatchRate newDispatchRate = createDispatchRate();
// if policy-throttling rate is disabled and cluster-throttling is enabled then apply
// cluster-throttling rate
if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) {
dispatchRate = newDispatchRate;
}
updateDispatchRate(dispatchRate);
}
}
@SuppressWarnings("deprecation")
public static DispatchRateImpl getPoliciesDispatchRate(final String cluster,
Optional<Policies> policies,
Type type) {
// return policy-dispatch rate only if it's enabled in policies
return policies.map(p -> {
DispatchRateImpl dispatchRate;
switch (type) {
case TOPIC:
dispatchRate = p.topicDispatchRate.get(cluster);
if (dispatchRate == null) {
dispatchRate = p.clusterDispatchRate.get(cluster);
}
break;
case SUBSCRIPTION:
dispatchRate = p.subscriptionDispatchRate.get(cluster);
break;
case REPLICATOR:
dispatchRate = p.replicatorDispatchRate.get(cluster);
break;
default:
log.error("error DispatchRateLimiter type: {} ", type);
return null;
}
return isDispatchRateEnabled(dispatchRate) ? dispatchRate : null;
}).orElse(null);
}
/**
* Gets configured dispatch-rate from namespace policies. Returns null if dispatch-rate is not configured
*
* @return
*/
public CompletableFuture<Optional<DispatchRate>> getPoliciesDispatchRateAsync(BrokerService brokerService) {
if (topicName == null) {
return CompletableFuture.completedFuture(Optional.empty());
}
final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
return getPoliciesAsync(brokerService, topicName).thenApply(policiesOp ->
Optional.ofNullable(getPoliciesDispatchRate(cluster, policiesOp, type)));
}
public static CompletableFuture<Optional<Policies>> getPoliciesAsync(BrokerService brokerService,
String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace);
}
public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) {
final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject();
return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace);
}
/**
* Update dispatch rate by updating msg and byte rate-limiter. If dispatch-rate is configured < 0 then it closes
* the rate-limiter and disables appropriate rate-limiter.
*
* @param dispatchRate
*/
public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
// synchronized to prevent race condition from concurrent zk-watch
log.info("setting message-dispatch-rate {}", dispatchRate);
long msgRate = dispatchRate.getDispatchThrottlingRateInMsg();
long byteRate = dispatchRate.getDispatchThrottlingRateInByte();
long ratePeriod = dispatchRate.getRatePeriodInSecond();
Supplier<Long> permitUpdaterMsg = dispatchRate.isRelativeToPublishRate()
? () -> getRelativeDispatchRateInMsg(dispatchRate)
: null;
// update msg-rateLimiter
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage =
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(msgRate)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdaterMsg)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.getRatePeriodInSecond(),
TimeUnit.SECONDS, permitUpdaterMsg);
}
} else {
// message-rate should be disable and close
if (this.dispatchRateLimiterOnMessage != null) {
this.dispatchRateLimiterOnMessage.close();
this.dispatchRateLimiterOnMessage = null;
}
}
Supplier<Long> permitUpdaterByte = dispatchRate.isRelativeToPublishRate()
? () -> getRelativeDispatchRateInByte(dispatchRate)
: null;
// update byte-rateLimiter
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte =
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(byteRate)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdaterByte)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.getRatePeriodInSecond(),
TimeUnit.SECONDS, permitUpdaterByte);
}
} else {
// message-rate should be disable and close
if (this.dispatchRateLimiterOnByte != null) {
this.dispatchRateLimiterOnByte.close();
this.dispatchRateLimiterOnByte = null;
}
}
}
private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) {
return (topic != null && dispatchRate != null)
? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.getDispatchThrottlingRateInMsg()
: 0;
}
private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) {
return (topic != null && dispatchRate != null)
? (long) topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.getDispatchThrottlingRateInByte()
: 0;
}
/**
* Get configured msg dispatch-throttling rate. Returns -1 if not configured
*
* @return
*/
public long getDispatchRateOnMsg() {
return dispatchRateLimiterOnMessage != null ? dispatchRateLimiterOnMessage.getRate() : -1;
}
/**
* Get configured byte dispatch-throttling rate. Returns -1 if not configured
*
* @return
*/
public long getDispatchRateOnByte() {
return dispatchRateLimiterOnByte != null ? dispatchRateLimiterOnByte.getRate() : -1;
}
private static boolean isDispatchRateEnabled(DispatchRate dispatchRate) {
return dispatchRate != null && (dispatchRate.getDispatchThrottlingRateInMsg() > 0
|| dispatchRate.getDispatchThrottlingRateInByte() > 0);
}
public void close() {
// close rate-limiter
if (dispatchRateLimiterOnMessage != null) {
dispatchRateLimiterOnMessage.close();
dispatchRateLimiterOnMessage = null;
}
if (dispatchRateLimiterOnByte != null) {
dispatchRateLimiterOnByte.close();
dispatchRateLimiterOnByte = null;
}
}
private static final Logger log = LoggerFactory.getLogger(DispatchRateLimiter.class);
}