| # PIP-406: Introduce metrics related to dispatch throttled events |
| |
| # Background knowledge |
| |
| ## Motivation |
| |
| Currently, users can monitor subscription backlogs using the `pulsar_subscription_back_log_no_delayed` metric. |
| However, if [dispatch throttling](https://pulsar.apache.org/docs/next/concepts-throttling/) is configured at the broker/topic/subscription level, |
| this metric may not accurately reflect whether the backlog is due to insufficient consumer capacity, as it could be caused by dispatch throttling. |
| |
| ## Goals |
| |
| Introduce metrics to indicate the count of `messages/bytes throttled` for **broker/topic/subscription** level rate limit. |
| This allows users to write PromQL queries to identify subscriptions with high backlogs but low or no throttling, pinpointing backlogs caused by insufficient consumer capacity. |
| |
| ## In Scope |
| |
| - Introduce the metric `pulsar_subscription_dispatch_throttled_msg_events` to represent the total number of times message dispatching was throttled on a subscription |
| - Introduce the metric `pulsar_subscription_dispatch_throttled_bytes_events` to represent the total number of times byte dispatching was throttled on a subscription |
| |
| Since throttling on a subscription can be caused by multiple rate limit(broker/topic/subscription), |
| these metrics will include a label `reason: broker/topic/subscription` to indicate which throttler is responsible. |
| |
| ## Out of Scope |
| - These states are not persistent and will reset upon broker restart/ topic re-load / subscription reconnected. |
| |
| # High Level Design |
| 1. Maintain metrics counters for throttling caused by `broker/topic/subscription` rate limit. Increment the appropriate counter whenever throttling occurs. |
| 2. Output these fields when retrieving metrics. |
| |
| |
| # Detailed Design |
| |
| ## Design & Implementation Details |
| 1. Maintain these fields in AbstractBaseDispatcher. |
| ```java |
| public final LongAdder dispatchThrottledMsgEventsBySubscriptionLimit = new LongAdder(); |
| public final LongAdder dispatchThrottledMsgEventsByTopicLimit = new LongAdder(); |
| public final LongAdder dispatchThrottledMsgEventsByBrokerLimit = new LongAdder(); |
| public final LongAdder dispatchThrottledBytesEventsBySubscriptionLimit = new LongAdder(); |
| public final LongAdder dispatchThrottledBytesEventsByTopicLimit = new LongAdder(); |
| public final LongAdder dispatchThrottledBytesEventsByBrokerLimit = new LongAdder(); |
| ``` |
| |
| 2. Each time a read occurs, if the expected number of messages or bytes is reduced, increment the metric by one. |
| ```diff |
| private boolean applyDispatchRateLimitsToReadLimits(DispatchRateLimiter rateLimiter, |
| MutablePair<Integer, Long> readLimits, |
| DispatchRateLimiter.Type limiterType) { |
| + int originalMessagesToRead = readLimits.getLeft(); |
| + long originalBytesToRead = readLimits.getRight(); |
| int availablePermitsOnMsg = (int) rateLimiter.getAvailableDispatchRateLimitOnMsg(); |
| if (availablePermitsOnMsg >= 0) { |
| readLimits.setLeft(Math.min(readLimits.getLeft(), availablePermitsOnMsg)); |
| } |
| long availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte(); |
| if (availablePermitsOnByte >= 0) { |
| readLimits.setRight(Math.min(readLimits.getRight(), availablePermitsOnByte)); |
| } |
| + if (readLimits.getLeft() < originalMessagesToRead) { |
| + switch (limiterType) { |
| + case BROKER -> dispatchThrottledMsgEventsByBrokerLimit.increment(); |
| + case TOPIC -> dispatchThrottledMsgEventsByTopicLimit.increment(); |
| + case SUBSCRIPTION -> dispatchThrottledMsgEventsBySubscriptionLimit.increment(); |
| + default -> {} |
| + } |
| + } |
| + if (readLimits.getRight() < originalBytesToRead) { |
| + switch (limiterType) { |
| + case BROKER -> dispatchThrottledBytesEventsByBrokerLimit.increment(); |
| + case TOPIC -> dispatchThrottledBytesEventsByTopicLimit.increment(); |
| + case SUBSCRIPTION -> dispatchThrottledBytesEventsBySubscriptionLimit.increment(); |
| + default -> {} |
| + } |
| + } |
| ``` |
| 3. Print these metrics when retrieving metrics(TopicStats#printTopicStats). |
| ```java |
| // write dispatch throttling metrics with `reason` labels to identify specific throttling |
| // causes: by subscription limit, by topic limit, or by broker limit. |
| writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events", |
| subsStats.dispatchThrottledMsgEventsBySubscriptionLimit, cluster, namespace, topic, |
| splitTopicAndPartitionIndexLabel, "subscription", sub, |
| "reason", "subscription"); |
| writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events", |
| subsStats.dispatchThrottledBytesEventsBySubscriptionLimit, cluster, namespace, topic, |
| splitTopicAndPartitionIndexLabel, "subscription", sub, |
| "reason", "subscription"); |
| writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events", |
| subsStats.dispatchThrottledMsgEventsByTopicLimit, cluster, namespace, topic, |
| splitTopicAndPartitionIndexLabel, "subscription", sub, |
| "reason", "topic"); |
| writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events", |
| subsStats.dispatchThrottledBytesEventsByTopicLimit, cluster, namespace, topic, |
| splitTopicAndPartitionIndexLabel, "subscription", sub, |
| "reason", "topic"); |
| writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events", |
| subsStats.dispatchThrottledMsgEventsByBrokerLimit, cluster, namespace, topic, |
| splitTopicAndPartitionIndexLabel, "subscription", sub, |
| "reason", "broker"); |
| writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events", |
| subsStats.dispatchThrottledBytesEventsByBrokerLimit, cluster, namespace, topic, |
| splitTopicAndPartitionIndexLabel, "subscription", sub, |
| "reason", "broker");} |
| ``` |
| |
| ## Public-facing Changes |
| |
| ### Metrics |
| |
| 1. `pulsar_subscription_dispatch_throttled_msg_events`: |
| - Description: The total number of times message dispatching was throttled on a subscription |
| - Attributes: |
| - tenant |
| - namespace |
| - topic |
| - subscription |
| - `reason`: broker/topic/subscription |
| - Unit: messages count |
| |
| 2. `pulsar_subscription_dispatch_throttled_bytes_events`: |
| - Description: The total number of times byte dispatching was throttled on a subscription |
| - Attributes: |
| - tenant |
| - namespace |
| - topic |
| - subscription |
| - `reason`: broker/topic/subscription |
| - Unit: messages count |
| |
| ### API |
| Add get throttle count interface on admin api `SubscriptionStats`. |
| ```java |
| /** |
| * Gets the total number of times message dispatching was throttled on a subscription due to broker rate limits. |
| * @return the count of throttled message events by subscription limit, default is 0. |
| */ |
| long getDispatchThrottledMsgEventsBySubscriptionLimit(); |
| |
| /** |
| * Gets the total number of times bytes dispatching was throttled on a subscription due to broker rate limits. |
| * @return the count of throttled bytes by subscription limit, default is 0. |
| */ |
| long getDispatchThrottledBytesEventsBySubscriptionLimit(); |
| |
| /** |
| * Gets the total number of times message dispatching was throttled on a subscription due to topic rate limits. |
| * @return the count of throttled message events by topic limit, default is 0. |
| */ |
| long getDispatchThrottledMsgEventsByTopicLimit(); |
| |
| /** |
| * Gets the total number of times bytes dispatching was throttled on a subscription due to topic rate limits. |
| * @return the count of throttled bytes events by topic limit, default is 0. |
| */ |
| long getDispatchThrottledBytesEventsByTopicLimit(); |
| |
| /** |
| * Gets the total number of times message dispatching was throttled on a subscription due to broker rate limits. |
| * @return the count of throttled message events by broker limit, default is 0. |
| */ |
| long getDispatchThrottledMsgEventsByBrokerLimit(); |
| |
| /** |
| * Gets the total number of times bytes dispatching was throttled on a subscription due to broker rate limits. |
| * @return the count of throttled bytes events by broker limit, default is 0. |
| */ |
| long getDispatchThrottledBytesEventsByBrokerLimit(); |
| ``` |
| |
| ### Configuration |
| - None |
| |
| # Backward & Forward Compatibility |
| - Full Compatibility |
| |