blob: 5a509dac8a9f11ebc57f213ea4dcdae21073fae3 [file] [log] [blame] [view]
# PIP-406: Introduce metrics related to dispatch throttled events
# Background knowledge
## Motivation
Currently, users can monitor subscription backlogs using the `pulsar_subscription_back_log_no_delayed` metric.
However, if [dispatch throttling](https://pulsar.apache.org/docs/next/concepts-throttling/) is configured at the broker/topic/subscription level,
this metric may not accurately reflect whether the backlog is due to insufficient consumer capacity, as it could be caused by dispatch throttling.
## Goals
Introduce metrics to indicate the count of `messages/bytes throttled` for **broker/topic/subscription** level rate limit.
This allows users to write PromQL queries to identify subscriptions with high backlogs but low or no throttling, pinpointing backlogs caused by insufficient consumer capacity.
## In Scope
- Introduce the metric `pulsar_subscription_dispatch_throttled_msg_events` to represent the total number of times message dispatching was throttled on a subscription
- Introduce the metric `pulsar_subscription_dispatch_throttled_bytes_events` to represent the total number of times byte dispatching was throttled on a subscription
Since throttling on a subscription can be caused by multiple rate limit(broker/topic/subscription),
these metrics will include a label `reason: broker/topic/subscription` to indicate which throttler is responsible.
## Out of Scope
- These states are not persistent and will reset upon broker restart/ topic re-load / subscription reconnected.
# High Level Design
1. Maintain metrics counters for throttling caused by `broker/topic/subscription` rate limit. Increment the appropriate counter whenever throttling occurs.
2. Output these fields when retrieving metrics.
# Detailed Design
## Design & Implementation Details
1. Maintain these fields in AbstractBaseDispatcher.
```java
public final LongAdder dispatchThrottledMsgEventsBySubscriptionLimit = new LongAdder();
public final LongAdder dispatchThrottledMsgEventsByTopicLimit = new LongAdder();
public final LongAdder dispatchThrottledMsgEventsByBrokerLimit = new LongAdder();
public final LongAdder dispatchThrottledBytesEventsBySubscriptionLimit = new LongAdder();
public final LongAdder dispatchThrottledBytesEventsByTopicLimit = new LongAdder();
public final LongAdder dispatchThrottledBytesEventsByBrokerLimit = new LongAdder();
```
2. Each time a read occurs, if the expected number of messages or bytes is reduced, increment the metric by one.
```diff
private boolean applyDispatchRateLimitsToReadLimits(DispatchRateLimiter rateLimiter,
MutablePair<Integer, Long> readLimits,
DispatchRateLimiter.Type limiterType) {
+ int originalMessagesToRead = readLimits.getLeft();
+ long originalBytesToRead = readLimits.getRight();
int availablePermitsOnMsg = (int) rateLimiter.getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg >= 0) {
readLimits.setLeft(Math.min(readLimits.getLeft(), availablePermitsOnMsg));
}
long availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte();
if (availablePermitsOnByte >= 0) {
readLimits.setRight(Math.min(readLimits.getRight(), availablePermitsOnByte));
}
+ if (readLimits.getLeft() < originalMessagesToRead) {
+ switch (limiterType) {
+ case BROKER -> dispatchThrottledMsgEventsByBrokerLimit.increment();
+ case TOPIC -> dispatchThrottledMsgEventsByTopicLimit.increment();
+ case SUBSCRIPTION -> dispatchThrottledMsgEventsBySubscriptionLimit.increment();
+ default -> {}
+ }
+ }
+ if (readLimits.getRight() < originalBytesToRead) {
+ switch (limiterType) {
+ case BROKER -> dispatchThrottledBytesEventsByBrokerLimit.increment();
+ case TOPIC -> dispatchThrottledBytesEventsByTopicLimit.increment();
+ case SUBSCRIPTION -> dispatchThrottledBytesEventsBySubscriptionLimit.increment();
+ default -> {}
+ }
+ }
```
3. Print these metrics when retrieving metrics(TopicStats#printTopicStats).
```java
// write dispatch throttling metrics with `reason` labels to identify specific throttling
// causes: by subscription limit, by topic limit, or by broker limit.
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
subsStats.dispatchThrottledMsgEventsBySubscriptionLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "subscription");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
subsStats.dispatchThrottledBytesEventsBySubscriptionLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "subscription");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
subsStats.dispatchThrottledMsgEventsByTopicLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "topic");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
subsStats.dispatchThrottledBytesEventsByTopicLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "topic");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
subsStats.dispatchThrottledMsgEventsByBrokerLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "broker");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
subsStats.dispatchThrottledBytesEventsByBrokerLimit, cluster, namespace, topic,
splitTopicAndPartitionIndexLabel, "subscription", sub,
"reason", "broker");}
```
## Public-facing Changes
### Metrics
1. `pulsar_subscription_dispatch_throttled_msg_events`:
- Description: The total number of times message dispatching was throttled on a subscription
- Attributes:
- tenant
- namespace
- topic
- subscription
- `reason`: broker/topic/subscription
- Unit: messages count
2. `pulsar_subscription_dispatch_throttled_bytes_events`:
- Description: The total number of times byte dispatching was throttled on a subscription
- Attributes:
- tenant
- namespace
- topic
- subscription
- `reason`: broker/topic/subscription
- Unit: messages count
### API
Add get throttle count interface on admin api `SubscriptionStats`.
```java
/**
* Gets the total number of times message dispatching was throttled on a subscription due to broker rate limits.
* @return the count of throttled message events by subscription limit, default is 0.
*/
long getDispatchThrottledMsgEventsBySubscriptionLimit();
/**
* Gets the total number of times bytes dispatching was throttled on a subscription due to broker rate limits.
* @return the count of throttled bytes by subscription limit, default is 0.
*/
long getDispatchThrottledBytesEventsBySubscriptionLimit();
/**
* Gets the total number of times message dispatching was throttled on a subscription due to topic rate limits.
* @return the count of throttled message events by topic limit, default is 0.
*/
long getDispatchThrottledMsgEventsByTopicLimit();
/**
* Gets the total number of times bytes dispatching was throttled on a subscription due to topic rate limits.
* @return the count of throttled bytes events by topic limit, default is 0.
*/
long getDispatchThrottledBytesEventsByTopicLimit();
/**
* Gets the total number of times message dispatching was throttled on a subscription due to broker rate limits.
* @return the count of throttled message events by broker limit, default is 0.
*/
long getDispatchThrottledMsgEventsByBrokerLimit();
/**
* Gets the total number of times bytes dispatching was throttled on a subscription due to broker rate limits.
* @return the count of throttled bytes events by broker limit, default is 0.
*/
long getDispatchThrottledBytesEventsByBrokerLimit();
```
### Configuration
- None
# Backward & Forward Compatibility
- Full Compatibility