Currently, users can monitor subscription backlogs using the pulsar_subscription_back_log_no_delayed
metric. However, if dispatch throttling is configured at the broker/topic/subscription level, this metric may not accurately reflect whether the backlog is due to insufficient consumer capacity, as it could be caused by dispatch throttling.
Introduce metrics to indicate the count of messages/bytes throttled
for broker/topic/subscription level rate limit. This allows users to write PromQL queries to identify subscriptions with high backlogs but low or no throttling, pinpointing backlogs caused by insufficient consumer capacity.
pulsar_subscription_dispatch_throttled_msg_events
to represent the total number of times message dispatching was throttled on a subscriptionpulsar_subscription_dispatch_throttled_bytes_events
to represent the total number of times byte dispatching was throttled on a subscriptionSince throttling on a subscription can be caused by multiple rate limit(broker/topic/subscription), these metrics will include a label reason: broker/topic/subscription
to indicate which throttler is responsible.
broker/topic/subscription
rate limit. Increment the appropriate counter whenever throttling occurs.public final LongAdder dispatchThrottledMsgEventsBySubscriptionLimit = new LongAdder(); public final LongAdder dispatchThrottledMsgEventsByTopicLimit = new LongAdder(); public final LongAdder dispatchThrottledMsgEventsByBrokerLimit = new LongAdder(); public final LongAdder dispatchThrottledBytesEventsBySubscriptionLimit = new LongAdder(); public final LongAdder dispatchThrottledBytesEventsByTopicLimit = new LongAdder(); public final LongAdder dispatchThrottledBytesEventsByBrokerLimit = new LongAdder();
private boolean applyDispatchRateLimitsToReadLimits(DispatchRateLimiter rateLimiter, MutablePair<Integer, Long> readLimits, DispatchRateLimiter.Type limiterType) { + int originalMessagesToRead = readLimits.getLeft(); + long originalBytesToRead = readLimits.getRight(); int availablePermitsOnMsg = (int) rateLimiter.getAvailableDispatchRateLimitOnMsg(); if (availablePermitsOnMsg >= 0) { readLimits.setLeft(Math.min(readLimits.getLeft(), availablePermitsOnMsg)); } long availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte(); if (availablePermitsOnByte >= 0) { readLimits.setRight(Math.min(readLimits.getRight(), availablePermitsOnByte)); } + if (readLimits.getLeft() < originalMessagesToRead) { + switch (limiterType) { + case BROKER -> dispatchThrottledMsgEventsByBrokerLimit.increment(); + case TOPIC -> dispatchThrottledMsgEventsByTopicLimit.increment(); + case SUBSCRIPTION -> dispatchThrottledMsgEventsBySubscriptionLimit.increment(); + default -> {} + } + } + if (readLimits.getRight() < originalBytesToRead) { + switch (limiterType) { + case BROKER -> dispatchThrottledBytesEventsByBrokerLimit.increment(); + case TOPIC -> dispatchThrottledBytesEventsByTopicLimit.increment(); + case SUBSCRIPTION -> dispatchThrottledBytesEventsBySubscriptionLimit.increment(); + default -> {} + } + }
// write dispatch throttling metrics with `reason` labels to identify specific throttling // causes: by subscription limit, by topic limit, or by broker limit. writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events", subsStats.dispatchThrottledMsgEventsBySubscriptionLimit, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, "subscription", sub, "reason", "subscription"); writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events", subsStats.dispatchThrottledBytesEventsBySubscriptionLimit, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, "subscription", sub, "reason", "subscription"); writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events", subsStats.dispatchThrottledMsgEventsByTopicLimit, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, "subscription", sub, "reason", "topic"); writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events", subsStats.dispatchThrottledBytesEventsByTopicLimit, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, "subscription", sub, "reason", "topic"); writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events", subsStats.dispatchThrottledMsgEventsByBrokerLimit, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, "subscription", sub, "reason", "broker"); writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events", subsStats.dispatchThrottledBytesEventsByBrokerLimit, cluster, namespace, topic, splitTopicAndPartitionIndexLabel, "subscription", sub, "reason", "broker");}
pulsar_subscription_dispatch_throttled_msg_events
:reason
: broker/topic/subscriptionpulsar_subscription_dispatch_throttled_bytes_events
:reason
: broker/topic/subscriptionAdd get throttle count interface on admin api SubscriptionStats
.
/** * Gets the total number of times message dispatching was throttled on a subscription due to broker rate limits. * @return the count of throttled message events by subscription limit, default is 0. */ long getDispatchThrottledMsgEventsBySubscriptionLimit(); /** * Gets the total number of times bytes dispatching was throttled on a subscription due to broker rate limits. * @return the count of throttled bytes by subscription limit, default is 0. */ long getDispatchThrottledBytesEventsBySubscriptionLimit(); /** * Gets the total number of times message dispatching was throttled on a subscription due to topic rate limits. * @return the count of throttled message events by topic limit, default is 0. */ long getDispatchThrottledMsgEventsByTopicLimit(); /** * Gets the total number of times bytes dispatching was throttled on a subscription due to topic rate limits. * @return the count of throttled bytes events by topic limit, default is 0. */ long getDispatchThrottledBytesEventsByTopicLimit(); /** * Gets the total number of times message dispatching was throttled on a subscription due to broker rate limits. * @return the count of throttled message events by broker limit, default is 0. */ long getDispatchThrottledMsgEventsByBrokerLimit(); /** * Gets the total number of times bytes dispatching was throttled on a subscription due to broker rate limits. * @return the count of throttled bytes events by broker limit, default is 0. */ long getDispatchThrottledBytesEventsByBrokerLimit();