| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service.persistent; |
| |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Supplier; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.service.BrokerService; |
| import org.apache.pulsar.broker.service.BrokerServiceException; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.DispatchRate; |
| import org.apache.pulsar.common.policies.data.Policies; |
| import org.apache.pulsar.common.policies.data.TopicPolicies; |
| import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; |
| import org.apache.pulsar.common.util.RateLimiter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class DispatchRateLimiter { |
| |
| public enum Type { |
| TOPIC, |
| SUBSCRIPTION, |
| REPLICATOR, |
| BROKER |
| } |
| |
| private final PersistentTopic topic; |
| private final String topicName; |
| private final Type type; |
| |
| private final BrokerService brokerService; |
| private RateLimiter dispatchRateLimiterOnMessage; |
| private RateLimiter dispatchRateLimiterOnByte; |
| |
| public DispatchRateLimiter(PersistentTopic topic, Type type) { |
| this.topic = topic; |
| this.topicName = topic.getName(); |
| this.brokerService = topic.getBrokerService(); |
| this.type = type; |
| updateDispatchRate(); |
| } |
| |
| public DispatchRateLimiter(BrokerService brokerService) { |
| this.topic = null; |
| this.topicName = null; |
| this.brokerService = brokerService; |
| this.type = Type.BROKER; |
| updateDispatchRate(); |
| } |
| |
| /** |
| * returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1. |
| * |
| * @return |
| */ |
| public long getAvailableDispatchRateLimitOnMsg() { |
| return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits(); |
| } |
| |
| /** |
| * returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1. |
| * |
| * @return |
| */ |
| public long getAvailableDispatchRateLimitOnByte() { |
| return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits(); |
| } |
| |
| /** |
| * It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed. |
| * |
| * @param msgPermits |
| * @param bytePermits |
| * @return |
| */ |
| public boolean tryDispatchPermit(long msgPermits, long bytePermits) { |
| boolean acquiredMsgPermit = msgPermits <= 0 || dispatchRateLimiterOnMessage == null |
| // acquiring permits must be < configured msg-rate; |
| || dispatchRateLimiterOnMessage.tryAcquire(msgPermits); |
| boolean acquiredBytePermit = bytePermits <= 0 || dispatchRateLimiterOnByte == null |
| // acquiring permits must be < configured msg-rate; |
| || dispatchRateLimiterOnByte.tryAcquire(bytePermits); |
| return acquiredMsgPermit && acquiredBytePermit; |
| } |
| |
| /** |
| * checks if dispatch-rate limit is configured and if it's configured then check if permits are available or not. |
| * |
| * @return |
| */ |
| public boolean hasMessageDispatchPermit() { |
| return (dispatchRateLimiterOnMessage == null || dispatchRateLimiterOnMessage.getAvailablePermits() > 0) |
| && (dispatchRateLimiterOnByte == null || dispatchRateLimiterOnByte.getAvailablePermits() > 0); |
| } |
| |
| /** |
| * Checks if dispatch-rate limiting is enabled. |
| * |
| * @return |
| */ |
| public boolean isDispatchRateLimitingEnabled() { |
| return dispatchRateLimiterOnMessage != null || dispatchRateLimiterOnByte != null; |
| } |
| |
| /** |
| * createDispatchRate according to broker service config. |
| * |
| * @return |
| */ |
| private DispatchRate createDispatchRate() { |
| int dispatchThrottlingRateInMsg; |
| long dispatchThrottlingRateInByte; |
| ServiceConfiguration config = brokerService.pulsar().getConfiguration(); |
| |
| switch (type) { |
| case TOPIC: |
| dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerTopicInMsg(); |
| dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerTopicInByte(); |
| break; |
| case SUBSCRIPTION: |
| dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerSubscriptionInMsg(); |
| dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerSubscriptionInByte(); |
| break; |
| case REPLICATOR: |
| dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerReplicatorInMsg(); |
| dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerReplicatorInByte(); |
| break; |
| case BROKER: |
| dispatchThrottlingRateInMsg = config.getDispatchThrottlingRateInMsg(); |
| dispatchThrottlingRateInByte = config.getDispatchThrottlingRateInByte(); |
| break; |
| default: |
| dispatchThrottlingRateInMsg = -1; |
| dispatchThrottlingRateInByte = -1; |
| } |
| |
| |
| return DispatchRate.builder() |
| .dispatchThrottlingRateInMsg(dispatchThrottlingRateInMsg) |
| .dispatchThrottlingRateInByte(dispatchThrottlingRateInByte) |
| .ratePeriodInSecond(1) |
| .relativeToPublishRate(type != Type.BROKER && config.isDispatchThrottlingRateRelativeToPublishRate()) |
| .build(); |
| } |
| |
| /** |
| * Update dispatch-throttling-rate. |
| * Topic-level has the highest priority, then namespace-level, and finally use dispatch-throttling-rate in |
| * broker-level |
| */ |
| public void updateDispatchRate() { |
| Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type); |
| if (!dispatchRate.isPresent()) { |
| getPoliciesDispatchRateAsync(brokerService).thenAccept(dispatchRateOp -> { |
| if (!dispatchRateOp.isPresent()) { |
| dispatchRateOp = Optional.of(createDispatchRate()); |
| } |
| updateDispatchRate(dispatchRateOp.get()); |
| if (type == Type.BROKER) { |
| log.info("configured broker message-dispatch rate {}", dispatchRate.get()); |
| } else { |
| log.info("[{}] configured {} message-dispatch rate at broker {}", |
| this.topicName, type, dispatchRate.get()); |
| } |
| }).exceptionally(ex -> { |
| log.error("[{}] failed to get the dispatch rate policy from the namespace resource for type {}", |
| topicName, type, ex); |
| return null; |
| }); |
| } else { |
| updateDispatchRate(dispatchRate.get()); |
| log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type, dispatchRate.get()); |
| } |
| } |
| |
| public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies, |
| String topicName, Type type) { |
| final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration(); |
| if (type == Type.BROKER) { |
| return brokerService.getBrokerDispatchRateLimiter().isDispatchRateLimitingEnabled(); |
| } |
| |
| Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type); |
| if (dispatchRate.isPresent()) { |
| return true; |
| } |
| |
| policies = policies.isPresent() ? policies : getPolicies(brokerService, topicName); |
| return isDispatchRateNeeded(serviceConfig, policies, topicName, type); |
| } |
| |
| public static Optional<DispatchRate> getTopicPolicyDispatchRate(BrokerService brokerService, |
| String topicName, Type type) { |
| Optional<DispatchRate> dispatchRate = Optional.empty(); |
| final ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration(); |
| if (serviceConfiguration.isSystemTopicEnabled() && serviceConfiguration.isTopicLevelPoliciesEnabled()) { |
| try { |
| switch (type) { |
| case TOPIC: |
| dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService() |
| .getTopicPolicies(TopicName.get(topicName))) |
| .map(TopicPolicies::getDispatchRate); |
| break; |
| case SUBSCRIPTION: |
| dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService() |
| .getTopicPolicies(TopicName.get(topicName))) |
| .map(TopicPolicies::getSubscriptionDispatchRate); |
| break; |
| case REPLICATOR: |
| dispatchRate = Optional.ofNullable(brokerService.pulsar().getTopicPoliciesService() |
| .getTopicPolicies(TopicName.get(topicName))) |
| .map(TopicPolicies::getReplicatorDispatchRate); |
| break; |
| default: |
| break; |
| } |
| } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { |
| log.debug("Topic {} policies have not been initialized yet.", topicName); |
| } catch (Exception e) { |
| log.debug("[{}] Failed to get topic dispatch rate. ", topicName, e); |
| } |
| } |
| |
| return dispatchRate; |
| } |
| |
| public static boolean isDispatchRateNeeded(final ServiceConfiguration serviceConfig, |
| final Optional<Policies> policies, final String topicName, final Type type) { |
| DispatchRate dispatchRate = getPoliciesDispatchRate(serviceConfig.getClusterName(), policies, type); |
| if (dispatchRate == null) { |
| switch (type) { |
| case TOPIC: |
| return serviceConfig.getDispatchThrottlingRatePerTopicInMsg() > 0 |
| || serviceConfig.getDispatchThrottlingRatePerTopicInByte() > 0; |
| case SUBSCRIPTION: |
| return serviceConfig.getDispatchThrottlingRatePerSubscriptionInMsg() > 0 |
| || serviceConfig.getDispatchThrottlingRatePerSubscriptionInByte() > 0; |
| case REPLICATOR: |
| return serviceConfig.getDispatchThrottlingRatePerReplicatorInMsg() > 0 |
| || serviceConfig.getDispatchThrottlingRatePerReplicatorInByte() > 0; |
| default: |
| log.error("error DispatchRateLimiter type: {} ", type); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| @SuppressWarnings("deprecation") |
| public void onPoliciesUpdate(Policies data) { |
| String cluster = brokerService.pulsar().getConfiguration().getClusterName(); |
| |
| DispatchRate dispatchRate; |
| |
| switch (type) { |
| case TOPIC: |
| dispatchRate = data.topicDispatchRate.get(cluster); |
| if (dispatchRate == null) { |
| dispatchRate = data.clusterDispatchRate.get(cluster); |
| } |
| break; |
| case SUBSCRIPTION: |
| dispatchRate = data.subscriptionDispatchRate.get(cluster); |
| break; |
| case REPLICATOR: |
| dispatchRate = data.replicatorDispatchRate.get(cluster); |
| break; |
| default: |
| log.error("error DispatchRateLimiter type: {} ", type); |
| dispatchRate = null; |
| } |
| |
| // update dispatch-rate only if it's configured in policies else ignore |
| if (dispatchRate != null) { |
| final DispatchRate newDispatchRate = createDispatchRate(); |
| |
| // if policy-throttling rate is disabled and cluster-throttling is enabled then apply |
| // cluster-throttling rate |
| if (!isDispatchRateEnabled(dispatchRate) && isDispatchRateEnabled(newDispatchRate)) { |
| dispatchRate = newDispatchRate; |
| } |
| updateDispatchRate(dispatchRate); |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| public static DispatchRateImpl getPoliciesDispatchRate(final String cluster, |
| Optional<Policies> policies, |
| Type type) { |
| // return policy-dispatch rate only if it's enabled in policies |
| return policies.map(p -> { |
| DispatchRateImpl dispatchRate; |
| switch (type) { |
| case TOPIC: |
| dispatchRate = p.topicDispatchRate.get(cluster); |
| if (dispatchRate == null) { |
| dispatchRate = p.clusterDispatchRate.get(cluster); |
| } |
| break; |
| case SUBSCRIPTION: |
| dispatchRate = p.subscriptionDispatchRate.get(cluster); |
| break; |
| case REPLICATOR: |
| dispatchRate = p.replicatorDispatchRate.get(cluster); |
| break; |
| default: |
| log.error("error DispatchRateLimiter type: {} ", type); |
| return null; |
| } |
| return isDispatchRateEnabled(dispatchRate) ? dispatchRate : null; |
| }).orElse(null); |
| } |
| |
| |
| /** |
| * Gets configured dispatch-rate from namespace policies. Returns null if dispatch-rate is not configured |
| * |
| * @return |
| */ |
| public CompletableFuture<Optional<DispatchRate>> getPoliciesDispatchRateAsync(BrokerService brokerService) { |
| if (topicName == null) { |
| return CompletableFuture.completedFuture(Optional.empty()); |
| } |
| final String cluster = brokerService.pulsar().getConfiguration().getClusterName(); |
| return getPoliciesAsync(brokerService, topicName).thenApply(policiesOp -> |
| Optional.ofNullable(getPoliciesDispatchRate(cluster, policiesOp, type))); |
| } |
| |
| public static CompletableFuture<Optional<Policies>> getPoliciesAsync(BrokerService brokerService, |
| String topicName) { |
| final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); |
| return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace); |
| } |
| |
| public static Optional<Policies> getPolicies(BrokerService brokerService, String topicName) { |
| final NamespaceName namespace = TopicName.get(topicName).getNamespaceObject(); |
| return brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached(namespace); |
| } |
| |
| /** |
| * Update dispatch rate by updating msg and byte rate-limiter. If dispatch-rate is configured < 0 then it closes |
| * the rate-limiter and disables appropriate rate-limiter. |
| * |
| * @param dispatchRate |
| */ |
| public synchronized void updateDispatchRate(DispatchRate dispatchRate) { |
| // synchronized to prevent race condition from concurrent zk-watch |
| log.info("setting message-dispatch-rate {}", dispatchRate); |
| |
| long msgRate = dispatchRate.getDispatchThrottlingRateInMsg(); |
| long byteRate = dispatchRate.getDispatchThrottlingRateInByte(); |
| long ratePeriod = dispatchRate.getRatePeriodInSecond(); |
| |
| Supplier<Long> permitUpdaterMsg = dispatchRate.isRelativeToPublishRate() |
| ? () -> getRelativeDispatchRateInMsg(dispatchRate) |
| : null; |
| // update msg-rateLimiter |
| if (msgRate > 0) { |
| if (this.dispatchRateLimiterOnMessage == null) { |
| this.dispatchRateLimiterOnMessage = |
| RateLimiter.builder() |
| .scheduledExecutorService(brokerService.pulsar().getExecutor()) |
| .permits(msgRate) |
| .rateTime(ratePeriod) |
| .timeUnit(TimeUnit.SECONDS) |
| .permitUpdater(permitUpdaterMsg) |
| .isDispatchOrPrecisePublishRateLimiter(true) |
| .build(); |
| } else { |
| this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.getRatePeriodInSecond(), |
| TimeUnit.SECONDS, permitUpdaterMsg); |
| } |
| } else { |
| // message-rate should be disable and close |
| if (this.dispatchRateLimiterOnMessage != null) { |
| this.dispatchRateLimiterOnMessage.close(); |
| this.dispatchRateLimiterOnMessage = null; |
| } |
| } |
| |
| Supplier<Long> permitUpdaterByte = dispatchRate.isRelativeToPublishRate() |
| ? () -> getRelativeDispatchRateInByte(dispatchRate) |
| : null; |
| // update byte-rateLimiter |
| if (byteRate > 0) { |
| if (this.dispatchRateLimiterOnByte == null) { |
| this.dispatchRateLimiterOnByte = |
| RateLimiter.builder() |
| .scheduledExecutorService(brokerService.pulsar().getExecutor()) |
| .permits(byteRate) |
| .rateTime(ratePeriod) |
| .timeUnit(TimeUnit.SECONDS) |
| .permitUpdater(permitUpdaterByte) |
| .isDispatchOrPrecisePublishRateLimiter(true) |
| .build(); |
| } else { |
| this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.getRatePeriodInSecond(), |
| TimeUnit.SECONDS, permitUpdaterByte); |
| } |
| } else { |
| // message-rate should be disable and close |
| if (this.dispatchRateLimiterOnByte != null) { |
| this.dispatchRateLimiterOnByte.close(); |
| this.dispatchRateLimiterOnByte = null; |
| } |
| } |
| } |
| |
| private long getRelativeDispatchRateInMsg(DispatchRate dispatchRate) { |
| return (topic != null && dispatchRate != null) |
| ? (long) topic.getLastUpdatedAvgPublishRateInMsg() + dispatchRate.getDispatchThrottlingRateInMsg() |
| : 0; |
| } |
| |
| private long getRelativeDispatchRateInByte(DispatchRate dispatchRate) { |
| return (topic != null && dispatchRate != null) |
| ? (long) topic.getLastUpdatedAvgPublishRateInByte() + dispatchRate.getDispatchThrottlingRateInByte() |
| : 0; |
| } |
| |
| /** |
| * Get configured msg dispatch-throttling rate. Returns -1 if not configured |
| * |
| * @return |
| */ |
| public long getDispatchRateOnMsg() { |
| return dispatchRateLimiterOnMessage != null ? dispatchRateLimiterOnMessage.getRate() : -1; |
| } |
| |
| /** |
| * Get configured byte dispatch-throttling rate. Returns -1 if not configured |
| * |
| * @return |
| */ |
| public long getDispatchRateOnByte() { |
| return dispatchRateLimiterOnByte != null ? dispatchRateLimiterOnByte.getRate() : -1; |
| } |
| |
| |
| private static boolean isDispatchRateEnabled(DispatchRate dispatchRate) { |
| return dispatchRate != null && (dispatchRate.getDispatchThrottlingRateInMsg() > 0 |
| || dispatchRate.getDispatchThrottlingRateInByte() > 0); |
| } |
| |
| public void close() { |
| // close rate-limiter |
| if (dispatchRateLimiterOnMessage != null) { |
| dispatchRateLimiterOnMessage.close(); |
| dispatchRateLimiterOnMessage = null; |
| } |
| if (dispatchRateLimiterOnByte != null) { |
| dispatchRateLimiterOnByte.close(); |
| dispatchRateLimiterOnByte = null; |
| } |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(DispatchRateLimiter.class); |
| } |