PIP-406: Introduce metrics related to dispatch throttled events

Background knowledge

Motivation

Currently, users can monitor subscription backlogs using the pulsar_subscription_back_log_no_delayed metric. However, if dispatch throttling is configured at the broker/topic/subscription level, this metric may not accurately reflect whether the backlog is due to insufficient consumer capacity, as it could be caused by dispatch throttling.

Goals

Introduce metrics to indicate the count of messages/bytes throttled for broker/topic/subscription level rate limit. This allows users to write PromQL queries to identify subscriptions with high backlogs but low or no throttling, pinpointing backlogs caused by insufficient consumer capacity.

In Scope

  • Introduce the metric pulsar_subscription_dispatch_throttled_msg_events to represent the total number of times message dispatching was throttled on a subscription
  • Introduce the metric pulsar_subscription_dispatch_throttled_bytes_events to represent the total number of times byte dispatching was throttled on a subscription

Since throttling on a subscription can be caused by multiple rate limit(broker/topic/subscription), these metrics will include a label reason: broker/topic/subscription to indicate which throttler is responsible.

Out of Scope

  • These states are not persistent and will reset upon broker restart/ topic re-load / subscription reconnected.

High Level Design

  1. Maintain metrics counters for throttling caused by broker/topic/subscription rate limit. Increment the appropriate counter whenever throttling occurs.
  2. Output these fields when retrieving metrics.

Detailed Design

Design & Implementation Details

  1. Maintain these fields in AbstractBaseDispatcher.
    public final LongAdder dispatchThrottledMsgEventsBySubscriptionLimit = new LongAdder();
    public final LongAdder dispatchThrottledMsgEventsByTopicLimit = new LongAdder();
    public final LongAdder dispatchThrottledMsgEventsByBrokerLimit = new LongAdder();
    public final LongAdder dispatchThrottledBytesEventsBySubscriptionLimit = new LongAdder();
    public final LongAdder dispatchThrottledBytesEventsByTopicLimit = new LongAdder();
    public final LongAdder dispatchThrottledBytesEventsByBrokerLimit = new LongAdder();
  1. Each time a read occurs, if the expected number of messages or bytes is reduced, increment the metric by one.
     private boolean applyDispatchRateLimitsToReadLimits(DispatchRateLimiter rateLimiter,
                                                         MutablePair<Integer, Long> readLimits,
                                                         DispatchRateLimiter.Type limiterType) {
+       int originalMessagesToRead = readLimits.getLeft();
+       long originalBytesToRead = readLimits.getRight();
        int availablePermitsOnMsg = (int) rateLimiter.getAvailableDispatchRateLimitOnMsg();
        if (availablePermitsOnMsg >= 0) {
            readLimits.setLeft(Math.min(readLimits.getLeft(), availablePermitsOnMsg));
        }
        long availablePermitsOnByte = rateLimiter.getAvailableDispatchRateLimitOnByte();
        if (availablePermitsOnByte >= 0) {
            readLimits.setRight(Math.min(readLimits.getRight(), availablePermitsOnByte));
        }
+       if (readLimits.getLeft() < originalMessagesToRead) {
+           switch (limiterType) {
+               case BROKER -> dispatchThrottledMsgEventsByBrokerLimit.increment();
+               case TOPIC -> dispatchThrottledMsgEventsByTopicLimit.increment();
+               case SUBSCRIPTION -> dispatchThrottledMsgEventsBySubscriptionLimit.increment();
+               default -> {}
+           }
+       }
+       if (readLimits.getRight() < originalBytesToRead) {
+           switch (limiterType) {
+               case BROKER -> dispatchThrottledBytesEventsByBrokerLimit.increment();
+               case TOPIC -> dispatchThrottledBytesEventsByTopicLimit.increment();
+               case SUBSCRIPTION -> dispatchThrottledBytesEventsBySubscriptionLimit.increment();
+               default -> {}
+           }
+       }
  1. Print these metrics when retrieving metrics(TopicStats#printTopicStats).
// write dispatch throttling metrics with `reason` labels to identify specific throttling
// causes: by subscription limit, by topic limit, or by broker limit.
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
                 subsStats.dispatchThrottledMsgEventsBySubscriptionLimit, cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel, "subscription", sub,
                    "reason", "subscription");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
                 subsStats.dispatchThrottledBytesEventsBySubscriptionLimit, cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel, "subscription", sub,
                    "reason", "subscription");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
                 subsStats.dispatchThrottledMsgEventsByTopicLimit, cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel, "subscription", sub,
                    "reason", "topic");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
                 subsStats.dispatchThrottledBytesEventsByTopicLimit, cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel, "subscription", sub,
                    "reason", "topic");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_msg_events",
                 subsStats.dispatchThrottledMsgEventsByBrokerLimit, cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel, "subscription", sub,
                    "reason", "broker");
writeTopicMetric(stream, "pulsar_subscription_dispatch_throttled_bytes_events",
                 subsStats.dispatchThrottledBytesEventsByBrokerLimit, cluster, namespace, topic,
                 splitTopicAndPartitionIndexLabel, "subscription", sub,
                    "reason", "broker");}

Public-facing Changes

Metrics

  1. pulsar_subscription_dispatch_throttled_msg_events:
  • Description: The total number of times message dispatching was throttled on a subscription
  • Attributes:
    • tenant
    • namespace
    • topic
    • subscription
    • reason: broker/topic/subscription
  • Unit: messages count
  1. pulsar_subscription_dispatch_throttled_bytes_events:
  • Description: The total number of times byte dispatching was throttled on a subscription
  • Attributes:
    • tenant
    • namespace
    • topic
    • subscription
    • reason: broker/topic/subscription
  • Unit: messages count

API

Add get throttle count interface on admin api SubscriptionStats.

    /**
 * Gets the total number of times message dispatching was throttled on a subscription due to broker rate limits.
 * @return the count of throttled message events by subscription limit, default is 0.
 */
long getDispatchThrottledMsgEventsBySubscriptionLimit();

/**
 * Gets the total number of times bytes dispatching was throttled on a subscription due to broker rate limits.
 * @return the count of throttled bytes by subscription limit, default is 0.
 */
long getDispatchThrottledBytesEventsBySubscriptionLimit();

/**
 * Gets the total number of times message dispatching was throttled on a subscription due to topic rate limits.
 * @return the count of throttled message events by topic limit, default is 0.
 */
long getDispatchThrottledMsgEventsByTopicLimit();

/**
 * Gets the total number of times bytes dispatching was throttled on a subscription due to topic rate limits.
 * @return the count of throttled bytes events by topic limit, default is 0.
 */
long getDispatchThrottledBytesEventsByTopicLimit();

/**
 * Gets the total number of times message dispatching was throttled on a subscription due to broker rate limits.
 * @return the count of throttled message events by broker limit, default is 0.
 */
long getDispatchThrottledMsgEventsByBrokerLimit();

/**
 * Gets the total number of times bytes dispatching was throttled on a subscription due to broker rate limits.
 * @return the count of throttled bytes events by broker limit, default is 0.
 */
long getDispatchThrottledBytesEventsByBrokerLimit();

Configuration

  • None

Backward & Forward Compatibility

  • Full Compatibility