PIP-323: Complete Backlog Quota Telemetry

Background knowledge

Backlog

A topic in Pulsar is the place where messages are written to. They are consumed by subscriptions. A topic can have many subscriptions, and it is those that maintains the state of message acknowledgment, per subscription - which messages were acknowledged and which were not.

A subscription backlog is the set of unacknowledged messages in that subscription. A subscription backlog size is the sum of the size of the unacknowledged messages (in bytes)..

Since a topic can have many subscriptions, and each has its own backlog, how does one define a backlog for a topic? A topic backlog is defined as the backlog of the subscription which has the oldest unacknowledged message. Since acknowledged messages can be interleaved with unacknowledged messages, calculating the exact size of that subscription backlog can be expensive as it requires I/O operations to read the messages from the ledgers. For that reason, the topic backlog size is actually defined to be the estimated backlog size of that subscription. It does so by summarizing the size of all the ledgers, starting from the current active one (the one being written to), up to the ledger which contains the oldest unacknowledged message for that subscription (There is actually a faster way to calculate it, but this was the definition chosen for this estimation in Pulsar).

A topic backlog age is the age of the oldest unacknowledged message (same subscription as defined for topic backlog size). If that message was written 30 minutes ago, its age is 30 minutes, and so is the topic backlog age.

Backlog Quota

Pulsar has a feature called backlog quota. It allows a user to define a quota - in effect, a limit - which limits the topic backlog. There are two types of quotas:

  1. Size based: The limit is for the topic backlog size (as we defined above).
  2. Time based: The limit is for the topic backlog age (as we defined above).

Once a topic backlog exceeds either one of those limits, an action is taken to hold the backlog to that limit:

  • The producer write is placed on hold for a certain amount of time before failing.
  • The producer write is failed
  • The subscriptions oldest unacknowledged messages will be acknowledged in-order until both the topic backlog size or age will fall inside the limit (quota). The process is called backlog eviction (happens every interval).

The quotas can be defined as a default value for any topic, by using the following broker configuration keys: backlogQuotaDefaultLimitBytes and backlogQuotaDefaultLimitSecond.

The quota can also be specified directly for all topics in a given namespace using the namespace policy, or a specific topic using a topic policy.

Monitoring Backlog Quota

The user today can calculate quota used for size based limit, since there are two metrics exposed today on a topic level: pulsar_storage_backlog_quota_limit and pulsar_storage_backlog_size. You can just divide the two to get a percentage and know how close the topic backlog to its size limit.

For the time-based limit, the only metric exposed today is the quota itself - pulsar_storage_backlog_quota_limit_time

Backlog Quota Eviction in the Broker

The broker has a method called BrokerService.monitorBacklogQuota(). It is scheduled to run every x seconds, as defined by the configuration backlogQuotaCheckIntervalInSeconds. This method loops over all persistent topics, and for each topic is checks whether the topic backlog exceeded either one of those topics.

As mentioned before, checking backlog size is a memory-only calculation, since each topic has the list of ledgers stored in-memory, including the size of each ledger. Same goes for the subscriptions, they are all stored in memory, and the ManagedCursor keeps track of the subscription with the oldest unacknowledged message, thus retrieveing it is O(1). Checking backlog based on time is costly if configuration key preciseTimeBasedBacklogQuotaCheck was set to true. In that case, it needs to read the oldest message to obtain its public timestamp, which is expensive in terms of I/O. If it was set to false, it's in-memory access only, since it uses the age of the ledger instead of the message, and the ledgers metadata is kept in memory.

For each topic which has exceeded its quota, if the policy chosen is eviction, then the process it performed synchronously. This process consumes I/O, as it needs read messages (using skip) to know where to stop acknowledging messages.

Motivation

Users which have defined backlog quota based on time, have no means today to monitor the backlog quota usage, time-wise, to know whether the topic backlog is close to its time limit or even passed it.

If it has passed it, the user has no means to know if it happened, when and how many times.

Goals

In Scope

  • Allow the user to know the backlog quota usage for time-based quota, per topic
  • Allow the user to know how many times backlog eviction happened, and for which backlog quota type

Out of Scope

None

High Level Design

We'll use the existing backlog monitoring process running in intervals. For each topic, the subscription with the oldest unacknowledged message is retrieved, to calculate the topic backlog age. At that point, we will cache the following for the oldest unacknowledged message:

  • Subscription name
  • Message position
  • Message publish timestamp

That cache will allow us to add a metric exposing the topic backlog age - pulsar_storage_backlog_age_seconds, which will be both consistent (same ones used for deciding on backlog eviction) and cheap to retrieve (no additional I/O involved). Coupled with the existing pulsar_storage_backlog_quota_limit_time metric, the user can use both to divide and get the usage of the quota (both are in seconds units).

We will add the subscription name containing the oldest unacknowledged message to the Admin API topic stats endpoints ({tenant}/{namespace}/{topic}/stats and {tenant}/{namespace}/{topic}/partitioned-stats), allowing the user a complete workflow: alert using metrics when topic backlog is about to be exceeded, then query topic stats for that topic to retrieve the subscription name which contains the oldest message. For completeness, we will also add the backlog quota limits, both age and size, and the age of oldest unacknowledged message.

We will add a metric allowing the user to know how many times the usage exceeded the quota, both for time or size - pulsar_storage_backlog_quota_exceeded_evictions_total, where the quota_type label will be either time or size. Monitoring that counter over time will allow the user to know when a topic backlog exceeded its quota, and if backlog eviction was chosen as action, then it happened, and how many times.

Some users may want the backlog quota check to happen more frequently, and as a consequence, the backlog age metric more frequently updated. They can modify backlogQuotaCheckIntervalInSeconds configuration key, but without knowing how long this check takes, it will be hard for them. Hence, we will add the metric pulsar_storage_backlog_quota_check_duration_seconds which will be of histogram type.

Detailed Design

Public-facing Changes

Public API

Adding the following to the response of topic stats, of both {tenant}/{namespace}/{topic}/stats and {tenant}/{namespace}/{topic}/partitioned-stats:

  • backlogQuotaLimitSize - the size in bytes of the topic backlog quota
  • backlogQuotaLimitTime - the topic backlog age quota, in seconds.
  • oldestBacklogMessageAgeSeconds - the age of the oldest unacknowledged (i.e. backlog) message, measured by the time elapsed from its published time, in seconds. This value is recorded every backlog quota check interval, hence it represents the value seen in the last check.
  • oldestBacklogMessageSubscriptionName - the name of the subscription containing the oldest unacknowledged message. This value is recorded every backlog quota check interval, hence it represents the value seen in the last check.

Metrics

NameDescriptionAttributesUnits
pulsar_storage_backlog_age_secondsGauge. The age of the oldest unacknowledged message (backlog)cluster, namespace, topicseconds
pulsar_storage_backlog_quota_exceeded_evictions_totalCounter. The number of times a backlog was evicted since it has exceeded its quotacluster, namespace, topic, quota_type = (time | size)
pulsar_storage_backlog_quota_check_duration_secondsHistogram. The duration of the backlog quota check process.clusterseconds
pulsar_broker_storage_backlog_quota_exceeded_evictions_totalCounter. The number of times a backlog was evicted since it has exceeded its quota, in broker levelcluster, quota_type = (time | size)
  • Since pulsar_storage_backlog_age_seconds can not be aggregated, with proper meaning, to a namespace-level, it will not be included as a metric when configuration key exposeTopicLevelMetricsInPrometheus is set to false.
  • pulsar_storage_backlog_quota_exceeded_evictions_total will be included as a metric also in namespace aggregation.

Alternatives

One alternative is to separate the backlog quota check into 2 separate processes, running in their own frequency:

  1. Check backlog quota exceeded for all persistent topics. The result will be marked in memory. If precise time backlog quota was configured then this will the I/O cost as described before.
  2. Evict messages for those topics marked.

This may enable more frequent updates to the backlog age metric making it more fresh, but the cost associated with it might be high, since it might result in more frequent I/O calls, especially with many topics. Another disadvantage is that it makes the backlog check and eviction more complex.

Links