TOC

Preface

Roughly 11 months ago, I started working on solving the biggest issue with Pulsar metrics: the lack of ability to monitor a pulsar broker with a large topic count: 10k, 100k, and future support of 1M. This started by mapping the existing functionality and then enumerating all the problems I saw (all documented in this doc).

This PIP is a parent PIP. It aims to gradually solve (using sub-PIPs) all the current metric system's problems and provide the ability to monitor a broker with a large topic count, which is currently lacking. As a parent PIP, it will describe each problem and its solution at a high level, leaving fine-grained details to the sub-PIPs. The parent PIP ensures all solutions align and does not contradict each other.

The basic building block to solve the monitoring ability of large topic count is aggregating internally (to topic groups) and adding fine-grained filtering. We could have shoe-horned it into the existing metric system, but we thought adding that to a system already ingrained with many problems would be wrong and hard to do gradually, as so many things will break. This is why the second-biggest design decision presented here is consolidating all existing metric libraries into a single one - OpenTelemetry. The parent PIP will explain why OpenTelemetry was chosen out of existing solutions and why it far exceeds all other options. I’ve been working closely with the OpenTelemetry community in the past eight months: brainstorming this integration, and raising issues, in an afford to remove serious blockers to making this migration successful.

I made every effort to summarize this document so that it can be concise yet clear. I understand it is an effort to read it and, more so, provide meaningful feedback on such a large document; hence I’m very grateful for each individual who does so.

I think this design will help improve the user experience immensely, so it is worth the time spent reading it.

TL;DR

Working with Metrics today as a user or a developer is hard and has many severe issues.

From the user perspective:

  • One of Pulsar strongest features is “cheap” topics, so you can easily have 10k - 100k topics per broker. Once you do that, you quickly learn that the amount of metrics you export via “/metrics” (Prometheus style endpoint) becomes massive. The cost to store them becomes too high, queries time-out or even “/metrics” endpoint itself times out, due to heavy performance cost in terms of CPU and memory to process so many metrics.
  • The only option Pulsar gives you today is all-or-nothing filtering and very crude aggregation. You switch metrics from topic aggregation level to namespace aggregation level. Also, you can turn off producer and consumer level metrics. You end up doing it all leaving you “blind”, looking at the metrics from a namespace level which is too high level. You end up conjuring all kinds of scripts on top of topic stats endpoint to glue some aggregated metrics view for the topics you need.
  • Summaries (metric type giving you quantiles like p95) which are used in Pulsar, can't be aggregated across topics / brokers due to its inherent design.
  • Plugin authors spend too much time on defining and exposing metrics to Pulsar, since the only interface Pulsar offers is writing your metrics by your self as UTF-8 bytes in Prometheus Text Format to byte stream interface given to you.
  • Pulsar histograms are exported in a way that is not conformant with Prometheus, which means you can't get the p95 quantile on such histograms, making them very hard to use in day to day life.
  • Too many metrics are rates which also delta reset every interval you configure in Pulsar and restart, instead of relying on cumulative (ever-growing) counters and letting Prometheus use its rate function.
  • And many more issues

From the developer perspective:

  • There are 4 different ways to define and record metrics in Pulsar: Pulsar own metrics library, Prometheus Java Client, Bookkeeper metrics library and plain native Java SDK objects (AtomicLong, ...). It's very confusing for the developer and creates inconsistencies for the end user (e.g. Summary, for example, is different in each).
  • Patching your metrics into “/metrics” Prometheus endpoint is confusing, cumbersome and error-prone.
  • Many more

This proposal offers several key changes to solve that:

  • Cardinality (supporting 10k-100k topics per broker) is solved by introducing a new aggregation level for metrics called Topic Metric Group. Using configuration, you specify for each topic its group (using wildcard/regex). This allows you to “zoom” out to a more detailed granularity level, like groups instead of namespaces, which you control how many groups you'll have, hence solving the cardinality issue, without sacrificing level of detail too much.
  • Fine-grained filtering mechanism, dynamic. You‘ll have rule-based dynamic configuration, allowing you to specify per namespace/topic/group which metrics you’d like to keep/drop. Rules allow you to set the default to have a small amount of metrics in group and namespace level only and drop the rest. When needed, you can add an override rule to “open” up a certain group to have more metrics at higher granularity (topic or even consumer/producer level). Since it‘s dynamic, you “open” such a group when you see it’s misbehaving, see it in topic level, and when all resolved, you can “close” it. A bit of similar experience to logging levels in Log4j or Logback, that you default and override per class/package.

Aggregation and Filtering combined solves the cardinality without sacrificing the level of detail when needed and most importantly, you determine which topic/group/namespace it happens on.

Since this change is so invasive, it requires a single metrics library to implement all of it on top of; Hence the third big change point is consolidating all four ways to define and record metrics to a single one, a new one: OpenTelemetry Metrics (Java SDK, and also Python and Go for the Pulsar Function runners). Introducing OpenTelemetry (OTel) solves also the biggest pain point from the developer perspective, since it's a superb metrics library offering everything you need, and there is going to be a single way - only it. Also, it solves the robustness for Plugin authors which will use OpenTelemetry. It so happens that it also solves all the numerous problems described in the doc itself.

The solution will be introduced as another layer with feature toggles, so you can work with existing system, and/or OTel, until gradually deprecating existing system. Pulsar OTel Metrics will support exporting as Prometheus HTTP endpoint (/metrics but different port) for backward compatability and also OTLP, so you can push the metrics to OTel Collector and from there ship it to any destination.

It's a big breaking change for Pulsar users on many fronts: names, semantics, configuration. Read at the end of this doc to learn exactly what will change for the user (in high level).

In my opinion, it will make Pulsar user experience so much better, they will want to migrate to it, despite the breaking change.

This was a very short summary. You are most welcomed to read the full design document below and express feedback, so we can make it better.

Background

What are metrics?

Any software we know in the world today, exposes metrics to show what transpires in it in an aggregated way. A metric is defined as:

  • A name - e.g. pulsar_rate_in
  • Attributes/labels - the context in which the following number applies for. For example cluster=europe-cluster, topic=orders
  • Timestamp - the time at which the following number was measured. Usually presented in epoch seconds: 1679403600, which means Tuesday, March 21, 2023 1:00:00 PM
  • Value - a numerical value. For example, 15,345. For pulsar_rate_in it means 15,345 bytes received in the interval (e.g., 1min)

Composing it all together looks like this:

pulsar_rate_in {cluster="europe-cluster", topic="orders"} 1679403600 15345

Metrics usually come in all kinds of types:

  • Counter - a number that keeps increasing. Most of the time used to measure a rate.
  • Gauge - a number that can go up and down. Example: How many HTTP requests are in-flight right now to the Admin API for a given broker?
  • Histogram - most of the time, it’s an explicit bucket histogram: you define several buckets, and each bucket has a range of values. Each time you report a value to the histogram, it finds the bucket in which the value falls inside that range and increases its counter by 1.
  • Summary - You record values to it, for example, the latency of API call to write to Bookkeeper. Once requested, it will give you the percentiles of those values, over the last X minutes or until it will be reset. For example, The 95 percentile of the last two minutes, is a number for which 95% of all values recorded to the summary over the last two minutes are below it.

As opposed to logs which tell you a step-by-step story, metrics give you an aggregated view of Pulsar behavior over time.

Messaging Metrics

Pulsar main feature is messaging: receiving messages from producers, and dispatching messages to consumers via subscriptions.

The metrics related to messaging are divided into a couple of aggregation levels: broker, namespace, topic, subscription, consumer and producer.

Some levels can have high cardinality, hence Pulsar offers several configuration to minimize the amount of unique time series exported, by the following toggles:

  • exposeTopicLevelMetricsInPrometheus - Settings this value to false will cause metrics to be reported in namespace level granularity only, while if true, the metrics will be reported in topic level granularity.
  • exposeConsumerMetricsInPrometheus - Setting this value to false will filter out any consumer level metric, i.e. filtering out pulsar_consumer_* metrics.
  • exposeProducerLevelMetricsInPrometheus - Setting this value to false will filter out any producer level metric, i.e. filtering out pulsar_producer_* metrics.

OpenTelemetry Basic Concepts

  • Measurement - the number you record. It can be for example “5” ms in the case of HTTP request latency histogram, or +2 in the case of an increment to a counter
  • Instrument - the object through which you record measurements.
  • Instrument Types
    • Counter
      • A number that only increases
    • UpDown Counter
      • A number that can increase or decrease
    • Gauge
      • A number that can increase or decrease, but can’t be aggregated across attributes. For example: temperature. If room 1 has 35c and room 2 has 40c, you can’t add them to get a meaningful number as opposed to number of requests.
    • Histogram
      • Records numbers and when asked shows a statistical analysis on it. Example: explicit bucket histogram, which shows count per buckets, where each bucket represents a value range.
  • Attributes
    • List of (name, value) pairs. Example: cluster=eu-cluster, topic=orders
    • Usually when recording a value to an instrument, e.g. counter, you do it in the context of an attribute set.
  • Meter
    • A factory object through which you create instruments. All created through it belong to it.
    • A Meter has a name and a version. Pulsar can have “pulsar” meter with it’s corresponding version. Plugins can have their own meter, with matching version.
    • The name and version will be available via attributes when exported to Prometheus, or any other time-series database.

Motivation

The current metric system has several problems which act as the motivation for this PIP. Each subsection below explains the background to the problem and the actual problem. No prior knowledge is required.

Lack of a single way to define and record metrics

In Pulsar there are multiple ways to define and record metrics - i.e. several metric libraries:

  • Prometheus Client
    • Prometheus has a client library in Java, providing objects to define and record several types of metrics: Gauge, Histogram, Counter (a.k.a. Collectors)
    • Majority of time, the static Collector registry is used. In some occasions, collector registries are created.
  • Pulsar Metrics library
    • Pulsar’s own metric library, providing objects to define and record several types of metrics:
      • Histogram: StatsBuckets
      • Rates: Rate.
      • Summary: Summary - An extension for Prometheus Client library providing a more performant version of Summary.
  • Bookkeeper Metrics API Implementation
    • Apache Bookkeeper (BK) has its own metrics library, divided into an API and SDK (implementation).
    • Pulsar has implemented the API, for several purposes
      • Integrate BK client metrics into Pulsar metrics exporter (described below).
      • Use BK objects which uses BK Metrics API and integrate their metrics into Pulsar metrics exporter. Examples: OrderedExecutors.
        • The BK code used in Pulsar is BK Client and OrderedExecutors.
      • Support Pulsar code which directly uses this API in Pulsar. PulsarZooKeeperClient and several Pulsar plugins are the most prominent examples
  • Native Java SDK
    • Plain java objects: LongAdder to act as Counters, AtomicLong or primitive long with atomic updater to act as Gauge.

Having multiple metric libraries is a problem for several reasons:

  • Confusing
    • Developers don’t really know which one to use
  • Completely different
    • Each one of them is different from the other. Prometheus client uses labels to record values, while Pulsar Metrics library and the Native Java SDK stores the labels separately and stitches them together only upon exporting
  • Different implementations for same exported type
    • Summary by Pulsar Metrics library uses a fixed time window (1 min) to reset and start accumulating metrics, while Pulsar Client summary uses a moving time window of 10 minutes.
    • StatsBucket by Pulsar Metrics library resets its bucket counters every interval (1 min) while Prometheus Client Histogram does not.
    • This creates confusion both for developers and users
  • Different usage
    • With Pulsar Metrics library and Java SDK, you must remember to follow certain conventions to reset the metrics explicitly and register them for export explicitly. With BK Metrics implementation and Prometheus Client, exporting is implicitly done for you.

I would summarize it with one word: confusion.

High topic count (cardinality) is not supported

Pulsar, as users know, is unique by allowing you to use a very high number of topics in a cluster - up to 1M. It’s not uncommon to find a broker with 10k up to 100k topics hosted on it.

For each topic, Pulsar exposes roughly 80 - 100 unique time series (metrics). A single broker with 100k topics will emit 10M unique time series (UTS). This usually results in the following for the user:

  • A single Prometheus, even for a single broker, will not suffice. This forces the user to switch to complicated distributed time series systems like Cortex, M3, VictoriaMetrics as they can horizontally scale.
  • If the user works with an observability vendor like DataDog or Logz.io, the cost of 10M UTS per broker, make it too expensive to monitor.
  • If the user is fortunate enough to have its own team dedicated to deploying a time series database, the query will most probably timeout due to the huge amount of time-series required to read. For vendors, it will either time out or make the query cost too expensive to use.
  • Heavy performance cost on Pulsar in terms of CPU and memory allocation to handle the huge amount of topics which translate to many attribute sets.

Hence, the common user behavior is:

  • Toggle-off topic level metrics and below (consumer/producer/subscription), leaving them with only namespace level monitoring
  • Develop their own scripts to call topics stats Admin API, to filter only the metrics they need, and aggregate to a level with reasonable cardinality.
  • Ship it to a vendor and bear the high cost

The filtering supported today is toggle-based (all or nothing) for certain levels, hence very coarse. You can toggle between namespace to topic level. If you chose topic level, you can toggle consumers and producers level metrics separately.

The aggregations provided today are:

  • Broker level (just merged this March 2023)
  • Namespace level or Topic level (normal topics and the partitions of a topic, not partitioned topic level)

Summary is widely used, but not aggregatable across brokers / labels

Summary, as explain in the Background section, is used to provide quantile values like p95, p75, for certain measurements - mostly latencies, but sometimes sizes. It is widely used in Pulsar: Ledger Offloader, Resource Groups, Replicated Subscriptions, Schema Registry, Broker Operability, Transaction Managements, Pulsar Functions and Topics, just to name a few.

The biggest problem with Summaries is that quantiles are not aggregatable mathematically. You can’t know what is the p95 of schema registry “get” operation latency across the cluster by knowing each p95 per broker. Any math you’ll do on it will result in large numerical error. This means that beyond the scope of a broker or label set (i.e. topic/partition) it’s unusable.

For example, the user is mostly interested in the topic publish latency, and not the topic-partition publish latency. Without aggregating, it’s impossible to know the publish latency for a topic with partitions, because we need to aggregate the publish latency of each topic-partition, but we can’t aggregate summaries as explained above.

This is not the case for Explicit Bucket Histograms or the new type called Exponential Bucket histogram. They are aggregatable and produce quantiles (extrapolating) with a small margin of error.

Existing Prometheus Export is hard to use

Most metrics framework offers you objects you create and then register to a registry. The exporting is taken care of for you. In Prometheus for example, you create a Counter, and register it to static collector registry. The exporter simply exports all collectors (the counter being one) registered to the static registry.

In Pulsar, since it has 4 different libraries you can use to define a metric, the exporter had to be written by Pulsar, to patch all of them together into a single response to the GET/metrics request.

If you have used Prometheus Client, you’re all set, as that integration was written for you. The problem is most usages are not that library, since it has serious performance issues, especially on high cardinality metrics (like topics).

Using all other libraries, you’re basically required to write a function that has the following signature: void writeMetrics(SimpleTextOutputStream stream) for each class containing metrics. Then you add a call to that function in PrometheusMetricsGenerator. The argument stream is basically a byte-array you’re writing bytes into, which represents the response body for /metrics that is about to be delivered. You need to be aware of that, and write the current state of each metric you have in your class, in Prometheus Exposition format.

This presents multiple problems:

  • The logic of printing metrics to a stream in Prometheus format is copied and pasted in many classes, as there is no types in this stream - it’s just a byte array.
  • Prometheus Format dictates that all metric data points with the same metric name be written one after another. The current “API” which just writes text to a Stream (in Prometheus text format) collides with that since it does not force that. It forced Pulsar to find an interim solution which is complicated (See PrometheusMetricStreams class), which holds a Stream per metric name.
  • Sometimes even the logic of flushing the stream was implemented more than once (e.g. FunctionMetricsResource writing their own Writer using Heap-based ByteBuf instead of direct memory)
  • There’s no single place to define shared labels. For example the cluster label must be added manually by any function.
  • It’s error-prone - you can forget to follow all those steps to export
  • It’s confusing for developers
  • It’s a lot of work for developers, when adding metrics to their features

Integrating metrics as Plugin author is hard, labor-intensive and prevents common functionality

Plugins have their own metrics. Most plugins were written to run inside Pulsar (you supply JARs or NARs loaded on Broker initialization). Pulsar doesn't provide a single interface through which you create metric objects and register them and integrate with Pulsar metrics reporting. Due to that, the following happens:

  • Plugin authors choose all sorts of metric libraries: BK Metrics API and SDK, Prometheus, and more.
  • If they chose Prometheus, and use the static collector, they need to do nothing this gets emitted with Pulsar metrics. This is not well known nor a typed way to define interfaces between Pulsar and Plugins.
  • If they chose other libraries, Pulsar provides plugin authors a way to interface their metric library with Pulsar’s, with the usage of the following interface: PrometheusRawMetricsProvider which contains a single method:void generate(SimpleTextOutputStream stream). This basically means they need to implement this function, so it will read the metrics from the framework they chose, and write it in Prometheus exposition format, in bytes.

Due to that, most plugin developers are forced to write their metric exporting logic on their own, causing more work to be done for them.

Since the interface is very low level, it creates several difficulties going forward:

  • Making sure Prometheus metrics are printed according to its format (for each name, all attributes are printed one after the other) is very difficult. You can’t do that easily with the current interface.
  • If you want to introduce any common mechanism for filtering or any other work on those metrics, you can’t since it forces you to decode them from the text format, which will consume too much CPU and memory.

Lack of ability to introduce another metrics format

Due to multiple libraries and lack of high level metrics interface for plugins, it’s basically impossible to add another export format in a performant manner suitable for latency sensitive system such as Pulsar. The metrics system today is coupled to Prometheus format, thus prevents any addition of a new, better format.

Take for example, OTLP, a new protocol for Traces/Logs/Metrics. It’s more optimized than Prometheus, since for example, for histograms, it mentions the attributes once for all buckets, rather than repeating it for each bucket like Prometheus format.

OTLP can’t be added to Pulsar as another exporting mechanism, in current metric system.

Adding Rates is error-prone

The way Rate is built forces the developer to both:

  1. Manually add a print of the value of the new instance created, to the function which writes the metrics in Prometheus format to SimpleTextOutputStream (each class has such a method)
  2. Add a call to reset() of the Rateinstance, to a function which runs periodically.

Both are not something a developer can understand on its own, therefor easy to forget to do, or even call twice by mistake. Also wastes time to learn how to do it.

Inline metrics documentation is lacking

Each metric, in Prometheus format should contain a line starting with #HELP which allows Prometheus to parse that line and add a description to this metric, which is later used by UIs like Grafana to be better explain the available metrics.

Since there are 4 metric libraries, only Prometheus Client offers the typed option of including a description. Most metrics are not using it, hence lack a descent help line.

Histograms can’t be visualized or used since not Prometheus conformant

The main histogram used in Pulsar is StatsBucket. It has two major problems:

  1. Bucket counters in it are reset to 0 every 1 min (configurable). This goes against Prometheus assumption that bucket counters only increase. As such, it prevents using Prometheus functions on it like calculating quantiles (histogram_qunatile), which is the main reason to use histograms.

  2. When exported, the bucket label is encoded in the metric name, and not as le label as Prometheus expects, hence makes it impossible to use histogram_quantile and calculate quantiles on it.

    For example: pulsar_storage_write_latency_le_10 Should have been pulsar_storage_write_latency{le=”10”}

Some metrics are delta-reset, making it easy to lose data on occasions

Rate, StatsBucket, Summary, some exported JVM metrics and Pulsar Function metrics are reset to 0 every 1 min (configurable). This means that if from some reason, Prometheus or any other agent, fails to scrape the metrics for several minutes, you lost the visibility to Pulsar during those minutes. When using counters / histograms which are only incremented, the rate is calculated as delta on the counter values hence if two measurements 5 minutes apart, will still give you a descent average on that period. Same goes for histogram quantile calculation.

Prometheus client metrics use static registry, making them susceptible to flaky tests

Most usage of Prometheus Client library is done using the static Collector registry. This exposes it to flaky behavior across tests, as static variables are shared across tests, and not cleaned between them. When using non-static registry, it inherently resets itself every new test, but this is not the case here.

Function custom metrics are both delta reset and limited in types to summary

A Pulsar Function author has a single way to add metrics to their function, using method ``void recordMetric(String metricName, double value)onBaseContext` interface.

What it does, is record this value in a Prometheus Summary named pulsar_function_user_metric_, under the label metric={metricName}.

This Summary metric is also being reset every 1min by the wrapper code running the user function.

It has the following problems:

  • The values are reset every 1min, hence subject to data-loss as presented above,
  • The user is forced to use a Summary only, and is not offered the ability use types like Counter, Histogram or Gauge. The user find all sort of hacks around it to represent counters using summary’s count and sum.

Inconsistent reporting of partitioned topics

There is a configuration specifying if a Partitioned Topic, composed of several partitions each is a Topic, will be printed using topic label only (i.e. topic=incoming-logs-partition-2) or split into topic and partition (i.e. topic=incoming-logs, partition=2).

The problem is that this configuration is only applied to messaging related metrics (namespace, topic, producer, consumer, subscription). It is not applied to any other metric which contains the topic label, such as Transactions, Ledger Offloader, etc. This creates inconsistency in reported metrics.

System metrics manually scraped from function processes override each other

Pulsar Functions are launched by instances (processes) of Pulsar Function Worker. It supports 3 types of runtimes (function launchers):

  1. Thread - run the function in a new thread inside the Function Worker process
  2. Process - launch a process, which will run wrapper code executing the function (in Java, Python and Go).
  3. Kubernetes - launching a Pod, running the same wrapper code as Process runtime.

The metrics of the wrapper code, which also includes the function custom metrics (metrics the function authors adds), are exposed on /metrics endpoint by the wrapper code. In the case of Kubernetes, the pod is annotated such that Prometheus operators will scrape those metrics directly. In the case of Thread runtime, the metrics are integrated into the Function Worker metrics. In the case or Process runtime, the Function Worker is the one scraping the /metrics from each function process, concatenating them and add it to Function Worker /metrics.

Process runtime also includes many JVM and system level metrics registered using Prometheus Client built-in exporters. Due to this (not Pulsar code) it doesn't contain any special label identifying this function.

When the Function Worker scrapes each /metric endpoint, it simply concat the response, and since no unique label exists, the metrics override each other.

For example, if a Function Worker launched 3 processes, one for each function, then each will contain jvm_memory_bytes_used{area="heap"} 2000000, with different numeric value. In it there is no unique label. When concatenating the response from the three functions processes, without any process/function we will not know from each process this arrived from, and they will override each other.

No consistent naming convention used throughout pulsar metrics

  • Some domains have a metric prefix, like pulsar_txn for transactions related metrics or pulsar_schema for Pulsar Schema metrics. Some don’t, like metrics related to messaging (topic metrics) - for example pulsar_bytes_in or pulsar_entry_size_le_*.
  • Some metrics start with brk_ while others start with pulsar_. Some are even replaced from brk_ to pulsar_ during metric export.

This makes it very hard:

  • Defining filters. If you want to exclude messaging related metrics, you can’t as pulsar_* will catch all other pulsar’s metrics.
  • Compose dashboard: It’s easier to type a domain prefix to zoom in on its metrics, like pulsar_ledgeroffloader_but it’s impossible for metrics such as messaging metrics which doesn't really have a prefix.

Goals

In Scope

  • Allow monitoring Pulsar broker with very high topic count (10k - 1M), without paying the price of high cardinality, by providing a mechanism which aggregates topic-level metrics to an aggregation level called Topic Metric Group which the operator controls dynamically.
  • Allow dictating (filtering) which metrics will be exported, per any granularity level metrics: namespace, topic metric group, topic, consumer, producer, subscription.
  • Replace Summary with Explicit Bucket Histogram
  • Consolidate metrics usage (define, export) to a single library (i.e. OpenTelemetry)
  • Provide a rich typed interface to hook into Pulsar metrics system for Plugin authors
  • Make adding a Rate robust and error-free
  • Make histogram reporting conformant with Prometheus when exported to Prometheus format
  • Stop using static metric registries
  • Provide ability in the future to correlated metrics with logs and traces, by sharing context
  • Provide a pluggable metrics exporting, supporting a more efficient protocol (i.e. OTLP)
  • Support the most efficient observability protocol, OTLP.
  • Stop using delta reset, everywhere, including function metrics
  • Provide rich typed interface to define metrics for Pulsar Functions authors
  • All Pulsar metrics are properly named following a well-defined convention, adhering to OTel Semantic Conventions for instrument and attribute naming where possible
  • All changed listed above will at least as good, performance wise (CPU, latency) as current system
  • New system should support the maximum supported number of topics in current system (i.e. 4k topics) without filtering

Out of Scope

  • Pulsar client metrics

High Level Design

Consolidating to OpenTelemetry

We will introduce a new metrics library that will be the only metrics library used once this PIP implementation reaches its final phase. The chosen library is OpenTelemetry Java SDK. Full details on what is OpenTelemetry and why it was chosen over other metric libraries is located at the Detailed Design section below. This section focuses on describing it in high level.

OpenTelemetry is a project that provides several components: API, SDK, Collector and protocol. It’s main purpose is defining a standard way and reference implementation to define, export and manipulate observability signals: metrics, logs and traces. In this explanation we’ll focus on the metrics signal. The API is a set of interfaces used to define and report metrics. The SDK is the implementation of that API and also added functionality such as export of metrics and ability to change how certain metrics are collected and measured. The protocol is called OTLP, and it's the most efficient protocol to transmit metrics, logs and traces to its final destination (be it a database or vendor). The Collector is a light-weight process that allows receiving/pulling metrics/logs/traces in various formats, transform them (i.e. filter, aggregate, maintain state, etc.), and export them to many destinations (many vendors and databases).

The project’s API also has a part for reporting logs and traces. One its core features is the ability to share common context that can be used across metrics, logs and traces reporting (think Thread Local contains an attribute which is also used when reporting metric’s baggage, logs and traces, hence creates a link between them).

In this PIP scope we will only use the Metrics API and SDK. Specifically we will use the Java SDK in Pulsar Broker, Proxy and Function Worker, but also the Go and Python SDK for the wrapper code which executes functions written in Go and Python.

We will keep the current metric system as is, and add a new layer of metrics using OpenTelemetry Java SDK: All of Pulsar’s metrics will be created also using OpenTelemetry. A feature flag will allow enabling OpenTelemetry metrics (init, recording and exporting). All the features and changes described here will be done only in the OpenTelemetry layer, allowing to keep the old version working until you’re ready to switch using the OTel (OpenTelemetry) implementation. In the far future, once OTel usage has stabilized and became widely adopted we’ll deprecate current metric system and eventually remove it. We will also make sure there is feature flag to turn off current Prometheus based metric system. There‘s no need to have an abstraction layer on both the current metric system and the new one (OTel) as OpenTelemetry API is the abstraction layer, and it’s the industry standard.

One very big breaking change (there are several described in the High Level Design, and also summarized in the Backward Compatibility section) is the naming. We are changing all metric names due to several reasons:

  1. Attributes names (a.k.a. Labels) will utilize the Semantic Conventions defined in OTel.
    1. OpenTelemetry defined an agreed upon attribute names for many attributes in the industry.
  2. Histograms bucket names will be properly encoded using le attribute (when exported to Prometheus format) and not inside the metric name.
  3. Each domain in Pulsar will be properly prefixed (messaging, transactions, offloader, etc.)

The move to OpenTelemetry is not without cost, as we both need to improve the library and adjust the way we use metrics in Pulsar with it. The Detailed Design contains a section detailing exactly what we need to improve in OpenTelemetry to make sure it fits Apache Pulsar. It mostly consists of making it performant (almost allocation free), and small additions like removing attribute sets limit per instrument.

The changes we need to make in Pulsar to use it are detailed in this high level design section below. It’s composed of two big changes: changing all histograms to be only at namespace level instead of topic level, and switching from Summary to Histogram (Explicit Bucket).

There will be a sub-PIP detailed exactly how OpenTelemetry will be integrated into Pulsar, detailing how users will be able to configure it: their own views, exporters, etc. We need to see how to support that, given we want to introduce our own filtering predicate.

Here's a very short idea of how the code will look like using OpenTelemetry. In reality, we will use batch callback and different design. The sub-PIP will specify the exact details.

class TopicInstruments {
    // ...
    meter.counterBuilder("pulsar.messaging.topic.messages.received.size")
            .setUnit("bytes")
            .buildWithCallback(observableLongMeasurement -> {
              for (topic : getTopics()) {
                val size = topic.getMessagesReceivedSize();
                observableLongMeasurement.record(size, topic.getAttributes());
              }
            });
  // ...
}


class PersistentTopic {
  // ...
  Attributes topicAttributes
  LongAdder  messagesReceivedSize        
  // ...
          
  init(String topicName) {
    Attributes topicAttributes = Attributes.builder()
            .put("topic", topicName)
            .put("namespace", namespaceName)
            .build();
  }
  // ...
  
  
  messageReceived() {
    // ...
    messagesReceivedSize.add(msgSize);
  }
 
  getMessagesReceivedSize() {
    return messagesReceivedSize.value();
  }
} 

Aggregate and Filtering to solve cardinality issues

Aggregation

When you have cardinality issues, specifically topics being the root cause since Pulsar support up to 1M topics cluster wide, roughly translated to 100k topics per single broker. The best way to solve it is to reduce the cardinality.

We will introduce a new term called Topic Metric Group. Each topic will be mapped to a single group. The groups are the tool we’ll use to reduce cardinality to a descent level, which roughly means 10 - 10k. A cardinality scale your time series database can handle properly.

For each metric Pulsar currently have in topic-level (topic is one of its attributes), we will create another metric, bearing a different name which will have its attributes in the group level (topicMetricGroup will be the attribute, without topic attribute). Each time we’ll record a value in a topic metric, we’ll also record it in the equivalent Topic Metric Group metric. For consistency, we will do the same also for namespace level.

Example:

If today you chose topic level metric, which means you don’t have namespace level metrics, you’ll have the following metric:

pulsar_in_messages_total{topic="orders_company_foobar", namespace="finance", ...}

After the change (including naming changes discussed later), you’ll have:

pulsar_messaging_**namespace**_in_messages_total{namespace="finance"}
pulsar_messaging_**group**_in_messages_total{topicMetricGroup="orders", namespace="finance"}
pulsar_messaging_**topic**_in_messages_total{topic="orders_company_foobar" topicMetricGroup="orders", 
                namespace="finance"}

There will be a dynamic configuration allowing to specify the mapping from topic to topic group, in a convenient way. Since there can be many ways to configure that, we’ll define a plugin interface which will provide that mapping given a topic, and we’ll provide a default implementation for it. This will allow advanced users to customize to their needs.

The detailed design section contains a more detailed description of this mapping configuration. There will be a sub-PIP detailing how the mapping will be configured, which tools we need to add to allow knowing which topics are in each group, and more.

Filtering

The aggregation above using both namespace and Topic Metric Group, reduces the cardinality. The only thing left for us to do is add a great user experience tool to control which metrics you’d like to see exported, in fine granularity. A bit like logging levels, but with more fine-grained controls.

We would like to have a dynamic configuration allowing us to toggle which metric we wish to keep or drop. For example:

  • Keep namespace and topic metric group level metrics, for any namespace, any group.
  • For any topic, keep only backlog size metric, and drop anything else under “messaging” scope metrics.
  • For topic “incoming-logs”, keep all of its metrics.
  • For topic group “orders”, keep all metrics in topic level, but drop producer/consumer/producer level metrics.
  • For topic “billing”, keep all metrics (up to producer/consumer level).

We will introduce a new configuration, containing Filtering Rules, evaluated in order. Each rule will define a selector allowing you to select (metric, attributes) pairs based on all sorts of criteria, and then defining actions such as drop all, keep all, keep only, drop only, determining for each (metric, attribute) pair if it should be filtered out or not.

This configuration will be dynamic, hence allowing you to build namespace / group level dashboards to monitor Pulsar. Once you see a group misbehaving, you can dynamically “open” the filter to allow this group’s topic metric, and you can decide to only allow the certain metric you suspect is problematic. Once you find out the topic, you allow (stop dropping) all of its metrics, to enable you to debug it. Once you’re done, you can roll back to group level metrics.

We want to allow users to customize and build a filter matching their needs, hence we’ll create a plugin interface, which can decide for a given metric (name, attributes, unit, etc.) if they want to filter it or not.

The detailed design section will include a more detailed description of the plugin interface and default implementation configuration file. It will also include explanation how we plan to implement that in stages: 1st stage on our own and 2nd stage by introducing push down predicate into OpenTelemetry MetricReader.

Removing existing metric toggles

The fine-grained filtering mechanism described makes using the configuration toggles we have today deprecated hence we will remove them. This includes exposeConsumerLevelMetricsInPrometheus, exposeTopicLevelMetricsInPrometheus and exposeProducerLevelMetricsInPrometheus.

Summary

The aggregation to groups and namespaces which are of reasonable cardinality, coupled with the ability to decide on which metrics and specific attribute sets in them, you wish to export using the filters, solves the cardinality issue. Visibility to monitoring data is not sacrificed since dynamic configuration allows you to zoom in to get the finer details and later shut it off.

Changing the way we measure metrics

Moving topic-level Histograms to namespace and broker level only

Histograms are primarily used to record latencies: pulsar_storage_write_latency_le_*, pulsar_storage_ledger_write_latency_le_*, and pulsar_compaction_latency_*

We have several issues with them:

  • They cost a lot. Each histogram today translates to 12 unique time series, which means it costs like x12 more than a counter.
  • A broker is mostly a multi-tenant process by design. It’s almost never a single topic’s fault for latency, and even if it is, other topics will be affected as well. There is contention mostly on CPU of broker, and Bookkeeper, which both are shared across topics. Having it in topic level won’t help to diagnose topic root cause, and sometime mislead you.

The other problem we have is the topic move. Topics can move between brokers, due to automatic load balancing, or an operator decision to unload a topic. Today, when a topic is unloaded, the broker stops reporting the metrics for it.

In OTel, the API doesn't support remove() for an attribute set on an instrument (counter, histogram, etc.). Normally, if it was supported, we could have called remove for each instrument which contains an attribute-set for that topic.

OTel has two categories of instruments: synchronous and asynchronous. Synchronous instruments, keeps the value of the object in memory and updates it upon recording a new value (behind the scenes: it adds for example +2 to an AtomicLong it maintains for a Counter and the attribute set). Asynchronous instruments on the other hand are defined using a callback. When a call is made to collect the values for each instrument, a callback is invoked to retrieve the (attributes, value) pairs. when you record a value to an attribute set in a synchronous instrument, it will remain in memory forever until restart. Async instruments on the other hand, retrieve the (attributes, value) pairs upon collect, and forget them afterward.

Since remove() doesn't exist for instruments, async instruments are the closest thing we have in OTel. Thus Counter and UpDownCounters for topic instruments can be asynchronous instruments, hence when a topic is unloaded, the next callback will simply not record their values.

Histograms are problematic in that sense, since they yet to have an asynchronous version - alas they are only synchronous. Hence, if we use them for topic instruments, used attributes can never be cleared from memory for them thus we’ll have an “attributes” (topic) leak, as over time it will only grow. This is why currently we can’t use them for topics or topic groups.

We have opened an issue and making progress, but this is a long process.

Coupling the two together - cost, confusion, lack of clear use, and lack of support from OTel - we will change those topic level histograms to be namespace / broker level.

Integrating Messaging metrics into Open Telemetry

As explained before, instruments don’t support remove(). Topic, its subscriptions, producers and consumers - each has its own set of metrics. Also, each is ephemeral. Topics can move or be deleted, producers and consumers can stop and start many times, and subscriptions move with topics.

Hence, for topic, producers, consumer and subscription (and topic group) we will use asynchronous instruments. This means we’ll keep the state using our own LongAdder, AtomicLong or primitive long using atomic updater. When creating the asynchronous instrument, the callback will retrieve the value from the variable. For example, when a producer is removed, in the next collection cycle, the callback will not report metrics for it since it doesn't exist, and thus it will disappear from memory of OTel as well.

OTel has a special batch callback mechanism, allowing you to supply a single callback for multiple instruments, making it more efficient, and we plan to use it.

As explained before, we’ll have instruments per aggregation level: broker, namespace, topic group, topic, subscription, producer and consumer.

Broker and namespace level will use synchronous instruments, since the amount of namespaces is not expected to have high cardinality, hence not removing them is not a big attribute leak issue. Asynchronous instruments are also needed es explain in “Supporting Admin Rest API Statistics endpoints” section below.

Switching Summary to Histograms, in namespace/broker level only

Summary by design can’t be aggregated across topics or hosts (See Background and Motivation). That was the reason OTel doesn‘t support them. I opened an issue for that, but it doesn’t seem like something that can be added to OTel.

Another consideration is CPU. In benchmarks done, it seems that updating the summaries that are based on Apache Data Sketches cost 5% of CPU time, which is a lot compared with a simple +1 to a counter in a bucket in an explicit bucket histogram.

Due to those reasons, it makes sense to switch all summaries to histograms (explicit bucket type).

From the same reasons as histograms, they will be broker/namespace level only (not topic).

Most summaries are used for latency reporting, from same reasons of multi tenancy and careful inspection of existing summaries, we’ve concluded we can convert them to be namespace level. The domains affected by it are:

  • LedgerOffloader stats
  • Replication of subscription snapshot
  • Transactions
    • Transaction Buffer client
    • Pending Acks

The complete list of which summaries are affected is at the Detailed Design section

Pulsar Functions metrics uses summary for user defined metrics, but we assume the quantiles are actually meaningless since some use it to record the value “1” just to obtain count, and some record value to obtain a sum. We will convert them to Histogram without buckets, just providing sum and count.

  • Each custom metric is actually an attribute set bearing the attribute metric={user-defined-name}
  • We will define that in the init based on instrument name using views.

Specifying units for Histograms

We’ve inspected our summaries and histograms, and it seems the bucket ranges are common per unit: ms, seconds and bytes.

OTel has the notion of views. They are created upon init of OTel and can be used to specify the buckets for a set of instruments based on instrument selector (name wildcard, …). We have opened an issue for OTel SDK Specifications, which was merged to allow specifying a unit in an instrument selector. We only need to implement it in OTel Java SDK (See issue I’ve opened).

Removing Delta Reset

As described in the motivation and background section, Pulsar reset certain type of metrics every configurable interval (i.e. 1min) : rates, explicit bucket histograms and summaries.

As also explained, it makes hard to use for histograms, and redundant and less accurate for rates. Most time-series databases can easily calculate rates based on ever-increasing counters.

Hence, in OpenTelemetry we won’t report rates, we’ll switch to counters. That means of course name changing, but as described in this document, all names will be changed anyhow.

For histograms, we will simply never reset them.

Summaries are converted to histograms so also never reset anymore.

We will keep Rates around primarily for the statistics returned through Admin API (i.e. Topic Stats, …), but they will never be exposed through OpenTelemetry.

It is worth noting that OpenTelemetry supports the notion of Aggregation Temporality. In short, it allows you to define for a given instruments if you want it to be exported as delta or cumulative. Some exporters like Prometheus only support Cumulative, and will override it all to be such. OTLP supports delta. Currently, we’re not explicitly supporting configuring views / readers to allow that, but it’s something very easily added in the future, by the community. It will be perfect for people using OpenSearch or Elasticsearch.

Reporting topic and partition all the time

Today a topic is reported using the attribute topic={topicName}. If the topic is actually a partition for a partitioned topic, it will look like topic={partitionedTopicName}-partition-{partitionNum}. There is a configuration name splitTopicAndPartitionLabelInPrometheus which makes that be reported instead as topic={partitionedTopicName}, partition={partitionNum}.

This is not consistent, in such that not all metrics using topic attribute did the split accordingly.

In OTel metrics we will ignore that flag and always report it as topic={partitionedTopicName}, partition={partitionNum}. We will make it consistent across any topic attribute usage. Eventually this flag will be removed (probably in the next major version of Pulsar).

Metrics Exporting

OTel has a built-in Prometheus MetricReader (and exporter) which exposes /metrics endpoint, which we will use.

Pulsar current metric system has a caching mechanism for /metrics responses. This was developed since creating the response for high topic count was a CPU hog and in some cases memory hog. In our case we plan to use Filtering and Aggregation (Metric Topic Group) to drive the response to a reasonable size, hence won’t need to implement that in OTel metrics.

OTel also has built-in OTLP exporter. OTLP is the efficient protocol OTel has, which OTel Collector supports, and some vendors as well. We wish to use it, yet it seems that it is very heavy on memory allocation. Hence, we will need to improve it to make it allocation free as much as possible.

Avoiding static metric registry

OTel supports a static (Global) OTel instance, but we will refrain from it to make sure test data doesn't leak between tests.

In OTel we will create an instance of MeterProvider during Pulsar init. This object is the factory for Meter which by itself is a factory for instruments (Counter, Histogram, etc.). We will pass along a Pulsar Meter instance to each class that needs to create instruments. The exact details will be detailed in a sub-PIP of adding OpenTelemetry to Pulsar.

Metrics documentation

OTel doesn't force you to supply documentation.

We will create a static code analysis rule failing the build if it finds instrument creation without description.

We will optionally try to somehow create an automated way to export all metrics metadata - instrument name, documentation, in an easy-to-read format to be used for documentation purposes on Pulsar website.

Integration with BK

BookKeeper (client and server) has their own custom metrics library. It’s built upon a set of interfaces. Pulsar metric system has an implementation for it.

We will create another implementation to patch it to OTel.

Integrating with Pulsar Plugins

We will modify all popular plugin interfaces Pulsar has, such that they will accept an OpenTelemetry instance to be used to grab the MeterProvider instance and use it to create their own Meter with the plugin name and version which they will use to create their own instruments.

A user which has decided to turn on OTel metrics will have to verify all Pulsar plugins it uses have been upgraded to use the modified interface, otherwise their metrics will not be exported.

Plugin authors will need to release a new version of their plugin which implements the new interface and registers the metrics using OpenTelemetry.

One big advantage is that using OTel supports plugins running in stand-alone mode. Some plugins have the option to run some of their code outside Pulsar. By using OTel API, they can integrate either via their own SDK or Pulsar SDK (via OpenTelemetry instance).

A detailed list is at the Detailed Design.

A sub-PIP will be dedicated to integrating with plugins.

Supporting Admin Rest API Statistics endpoints

Pulsar has several REST API endpoints for retrieving detailed metrics. They are exposing rates, up-down counters and counters.

OTel instruments doesn't have methods to retrieve the current value. The only facility exposing that is the MetricReader that reads the entire metric set. Thus, any metric that is exposed also through Admin REST API will have to have its state maintained by Pulsar, either using LongAdder, AtomicLong or primitive long with atomic updater. The matching OTel instrument will be defined as Asynchronous instrument, meaning it is defined by supplying a callback function will be executed to retrieve the instrument current value. The callback will simply retrieve the state from the matching LongAdder, AtomicLong or primitive long (or double to that end) and return it as the current value.

Fixing Rate

We will change Rate, in such a way that won’t require the user creating it, calling reset() periodically and manually. All rates will be created via a manager class of some sort, and it will be the one responsible for scheduling resets. Rates will always expose a sum counter and a counter to save up on multiple variables. A sub-PIP will explain in detail how this will be achieved.

Function metrics

Background

Pulsar supports the notion of Pulsar Functions. It is user-supplied functions, either in Go, Java or Python, which can read messages, and write messages. You can use it to read all messages from a topic and write them to external system like S3 (Sink), or the other way around: read from an external system and write it to a topic (e.g. DB Change log to Pulsar topic - source). The other option is simply transforming the message received and writing it to another topic.

A user can submit a function, and can also configure the amount of instances it will have.

The code responsible for coordinating the execution of those functions is located in a component called Function Worker, which can be run as stand-alone process or as part of Pulsar process. You can run many Function Workers, yet only one function as the leader.

The leader takes care of splitting the work of executing the function instances between the different Function Workers (for load balancing purposes).

The Function worker has three runtimes, as in, three options to execute the functions it is in charge of:

  1. Thread: Creating a new thread and running the function in it.
  2. Process: Creating a new process and running the function in it.
  3. Kubernetes: Creating a Deployment for each function.

Each Function Worker has its own metrics it exposes.

Each Function has its own metrics it exposes.

In the Thread runtime, all metrics are funneled into Pulsar metrics (exposing a method which writes them into the SimpleTextOutputFormat).

In the Process runtime, the function is executed in its own process, hence there is a wrapper main() function executing the user supplied function in the process. This main() function has general function execution metrics (e.g. how many messages received, etc.), and also the function metrics (user custom metrics). All the metrics are defined using a single library: Prometheus client. The metrics are exposed using the client’s HTTP server exposing /metrics endpoint exposing the two categories of metrics.

In the Kubernetes runtime, the pod is annotated with Prometheus Operator annotation, and it is expected it will be installed, hence Prometheus will scrape those metrics directly from the process running in the pod.

In the Process runtime, the Function Worker is iterating over all processes it launched, and for each it issues a GET request to /metrics. The responses are concatenated together and printed to SimpleTextOutputStream.

The general function execution metrics comes in two forms: cumulative and 1 min ones. The latter names ends with _1min, and they get reset every 1min. (e.g. *_received_1min).

Each process launched also launches a gRPC server supporting commands. Two of those are related to metrics: resetMetrics and getAndResetMetrics. They reset all metrics - both the general framework ones and the customer user ones.

Prometheus client was also configured to emit several metrics using built in exporters: memory pool, JMX metrics, etc.

Solving it in Open Telemetry

In phase 1, we’ll keep the reporting of user defined metrics for function authors as is, and mainly focus on the other issues which are: metrics scraping for each runtime, and the 1min metrics. In phase 2, we’ll also add the option to define metrics for pulsar function authors via OTel.

Collecting metrics

Thread Runtime

The framework will use Pulsar OTel SDK, or it’s standalone function worker SDK, thus what ever export method it uses, it will use as well (prometheus, OTLP, …)

Kubernetes Runtime

OTel supports exporting metrics via /metrics endpoint using Prometheus format. We’ll support the same as it is today done with Prometheus client.

We’ll also support configuring the pod, so it can send metrics via OTLP to defined destination.

Process Runtime

The existing scraping of /metrics solution was not good:

  • It violated Prometheus format, since same name, different attributes lines must be one after another, and in reality it was concat as is.
  • The prometheus exporters metrics like memory pools, didn't have any unique attribute for each process, hence when concat, they would have same name same attributes different values, from different processes hence be lost.

OTel supports both exporting metrics as Prometheus and pushing them using OTLP. Making the Function worker pull the metrics, in effect - to be the hub - is super complicated. It’s much easier to simply let the processes be scraped or push OTLP metrics - configured the same as Pulsar is.

At phase 1 we won’t support Process Runtime.

At phase 2, we can use Prometheus HTTP Service Discovery, and expose such an endpoint in the Function Worker leader. Via health pings it gets from each worker, they can also report each process metrics port, thus allowing prometheus to scrape the metrics directly from each process. We’ll garner feedback from the community to see how important is Process runtime, as we have K8s runtime which is much more robust.

Removing 1min metrics

We’ll not define any 1min metric. Any TSDB can calculate that from the cumulative counter.

Supporting getMetrics RPC

We can define our own MetricReader which we can then filter to return the same metrics as we return today: general function metrics and user-defined metrics.

Removing resetMetrics RPC

OTel doesn't support metric reset, and it also violates Prometheus since it expects metrics to be cumulative. Thus, we will remove that method.

Supporting Python and Go Functions

OTel has an SDK for Python and Go, thus we’ll use it to export metrics.

Summary

A sub-pip will be created for Function Metrics which will include detailed design for it.

Detailed Design

Topic Metric Group configuration

As mentioned in the high level design section, we’ll have a plugin interface, allowing to have multiple implementations and to customize the way a topic is mapped to a group.

The default implementation this PIP will provide will be rule based, and described below.

We’ll have a configuration, that can look something like this:

bi-data // group name		
   namespace = bi-ns   // condition of the form: attribute name = expression
  topic = bi-* // condition of the form: attribute name = expression

incoming-logs
   namespace = *
  topic = incoming-logs-*

The configuration will contain a list of rules. Each rule begins with a group name, and list of matchers: one for namespace and one for topic. The rules will be evaluated in order, and once a topic was matched, we’ll stop iterating the rules.

There will be a sub-PIP detailing the plugin and default implementation in fine-grained detail: where it will be stored, how it will support changing it dynamically, performance, etc.

Integration with Pulsar Plugins

These are the list of plugins Pulsar currently uses

  • AdditionalServlet
  • AdditionalServletWithPulsarService - we can use PulsarService to integrate
  • EntryFilter - need to add init method to supply OTel
  • DelayedDeliveryTrackerFactory - accepts PulsarService
  • TopicFactory - accepts BrokerService
  • ResourceUsageTransportManager - need to add init method
  • AuthenticationProvider - need to add parameter to initialize method
  • ModularLoadManager - accepts PulsarService
  • SchemaStorageFactory - accepts PulsarService
  • JvmGCMetricsLogger - need to add init().
  • TransactionMetadataStoreProvider - need to add init
  • TransactionBufferProvider - need to add init
  • TransactionPendingAckStoreProvider - need to add init
  • LedgerOffloader - need to add init()
  • AuthorizationProvider - need to add parameter to initialize method
  • WorkerService - need to modify init methods
  • PackageStorageProvider
  • ProtocolHandler - need to modify init method
  • BrokerInterceptor - accepts PulsarService
  • BrokerEntryMetadataInterceptor

In a sub-PIP we will consider how to update those interfaces effectively allowing to pass OpenTelemetry instance, so they can create their own Meter or have access to an auxiliary class and supply only a Pulsar meter. Note that each a Meter has its own name and version and those are emitted as two additional attributes - e.g. {..., otel_scope_name="s3_offloader_plugin" otel_scope_version="1.2", ...}, to avoid any metric name collision.

Why OpenTelemetry?

What’s good about OTel?

  • It’s the new emerging industry-wide standard for observability and specific metrics, as opposed to just a library or a standard adopted and promoted by a single entity/company.

  • It’s much more sophisticated than the other libraries

    • OTel has the ability to change instruments by overriding their initial definition. For example, a Pulsar operator can change buckets of a histogram for a given bucket, reduce attributes if needed, or even copy-paste an instrument, changing its bucket while maintaining the original one if needed. This feature is called a View.
    • Its API is very clear. For example, a gauge can not be aggregated (i.e., CPU Usage), while UpDownCounter can (number of jobs in a queue).
    • Using OpenTelemetry Logs and Traces will allow sharing of context between them, making using Pulsar telemetry more powerful and helpful (Out of scope for this PIP, but possible)
    • Using an industry-standard API means when in the future libraries will accept OpenTelemetry interface for reporting traces/metrics/logs, the integration of it will not require any special development efforts.
    • Industry-standard also means when new developers onboard, they don’t need to learn something new
    • The SDK is still in the adoption/building phase, so they are more receptive to accepting changes from the community relative to other libraries (This was quite evident from issues I’ve opened that got fixed, community meetings attended, and brainstorming sessions held with maintainers)
    • Its design is the most elegant and correct compared to all other libraries (IMO). The idea of each instrument having an interchangeable aggregation, which is also how they implemented it is smart. The same goes for Reader and Exporter separation and Views.
    • It has support to decide if one metric or a family of them will be delta or cumulative. For Elasticsearch/OpenSearch users, it’s super powerful, as it allows them to create the same metrics with different names containing delta values and then feed only them to Elastic using the OTel Collector
    • Its protocol is much more efficient than other protocols (i.e., Prometheus text exposition format)
    • The library allows exporting the metrics as Prometheus and OTLP (push to OTel Collector) and it’s extendable by design
    • It has same API and implementation design for Python and Go, which we also need to support for the wrapper code running Pulsar Functions.

    Why not other libraries?

    Below I will list the libraries I found and why I don’t think they are suitable.

    • Micrometer
      • Micrometer had the vision of becoming the industry standard API like SLF4J is for logging in the Java ecosystem. In reality, it didn't catch on, as can be seen in the Maven Central statistics: It’s used by ~1000 artifacts, compared to sl4fj-api which is used by 60k artifacts—as such, picking it as the standard for today, seems like “betting” on the wrong project.
      • Micrometer architecture relies heavily on the library to implement all target systems like Datadog, Prometheus, Graphite, OTLP, and more. OTel relies on the collector to implement that as it has more power and can contain the state if one of those systems goes down for some time. I think it’s a smarter choice, and more vendors will likely appear and maintain their exporter in OTel collector as we advance. This makes it easier for operators to have one exporter code base (say to Cortex) across different languages of microservices, so it makes sense people will lean towards this framework and request it soon.
      • OTel was built with instrumentation scope in mind, which gives a sort of namespace per library or section of the code (Called Meter in the API). For Pulsar, it can be used to have one per plugin. Micrometer doesn't have that notion. It’s great especially if Pulsar and another plugin are using same library (e.g. Caffeine for caching), thus in Prometheus or other libraries the metrics will override each other, but in OTel the meter provides an attribute for name and version, thus provide a namespace.
      • OTel by design has an instrument that you report measurements for a given attribute set, meaning it has that design of instrument = map(attributes→values). In Micrometer, it’s designed in a way that each (instrument, attributes) is a metric on its own. Less elegant and more confusing.
      • Most innovations are likely to happen in the “new kids on the block,” which is OTel.
    • Dropwizard Metrics (previously Codahale)
      • Doesn't support different attributes per instrument (no tag support). It was slated for Dropwizard 5.x, but there is no available maintainer available to work on it, which is a problem on its own.
    • Prometheus Client
      • Currently, prometheus allocates all needed memory per collection. For a large amount of topics, this is a substantial performance issue. We tried conversing with them and pitched an observer pattern. They objected to the idea and wanted benchmark proof. The maintainer thinks it has added complexity. See here. In OTel they were happy to brainstorm the problem via GitHub issue, their weekly calls and private zoom sessions. They showed actual care to their open source users, and together we found a solution that is actively being worked on.
      • Only the Prometheus format is supported for export. OTLP is a more compact protocol since it packs all the buckets as a map of bucket numbers to their value instead of carrying all the labels for each bucket as the Prometheus client.
      • The library doesn't have the notion of different exporters as it was geared to export only to Prometheus.
      • No integration with Logs or Traces which will be needed in the future.

What we need to fix in OpenTelemetry

  • Performance
    • Once we have 1M topics per broker, each topic producing ~70 metric data points (that’s in a super relaxed assumption: we have one producer and one consumer), we’re talking about 70M metric data points.
      • The MetricReader interface and the MetricsExporter interfaces were designed to receive the metrics collected from memory by the SDK using a list, and for each collection cycle, an allocation of at least 70M objects (Metric data points).
      • The OTLP exporter specifically serializes the data points to protobuf by creating a Marshaller object per each piece of data in the data point, so 10-30 times 70M metric data points, which are objects to be garbage collected.
    • I have opened an issue starting discussion on trying to solve that: https://github.com/open-telemetry/opentelemetry-java/issues/5105
      • After discussion, the maintainer suggested object re-use per attribute set. He already implemented over 60% of the code needed to support it. We need to help the project finish it as detailed in issue description (mainly collection path should be allocation free, including exporters).
  • There is a non-configurable hard limit of 2000 attributes per instrument
    • There is a PR to the specifications to allow configuring that limit
    • Once the spec is approved, OTel Java SDK must also be amended.
  • Supporting push-down predicate to filter (instrument, attribute) pair in MetricsProducer
  • Fix bug: https://github.com/open-telemetry/opentelemetry-java/issues/4901

Nice to have

Issues completed while writing the design

Specifying units for histograms

We have two ways to do that:

  1. Since all latency with same units (milliseconds) share same buckets of histograms, we can use a view, and select all instruments in Pulsar Meter which has units of milliseconds, and there specify the buckets.
  2. Specify buckets using newly added hints, at instrument creation. This requires creating a constant and re-using it across all histograms.

Filtering Configuration

As mentioned in the high level design, we will define an interface allowing to have multiple built-in and also custom implementations of filtering. The interface will determine per each metric data point if it will be filtered or not. The data point is composed of: instrument name, attributes, unit, type.

Our default implementation will be ruled based, and will have the following configuration. I used here HOCON to make it less verbose. Exact syntax will be determined in sub-PIP. The configuration will be dynamic, allowing operators to change it in runtime. The exact mechanisms will be detailed in the sub-PIP.

rules {
  // All instruments starting with pulsar_, with a topic attribute
  // will be dropped by default. This will keep only topicMetricGroup and namespace level.
  default {
    instrumentSelect = "pulsar_*"
    attrSelect {
      topic = "*"
    }
    filterInstruments {
      dropAll = true
    }
  }

  // single topic, highest granularity
  bi-data {
    instrumentSelect = "pulsar_*"
    attrSelect {
      topicGroup = "bi-data"
    }
    filterInstruments {
      keepAll = true
    }
  }

  // multiple topics, highest granularity, only metrics I need	
  receipts {
    instrumentSelect = "pulsar_*"
    attrSelect {
      topic = "receipts-us-*"
    }
    filterInstruments {
      keepOnly = ["pulsar_rate_*"]
    }
  }

  // single topic, don't want subscriptions and consumer level
  logs {
    instrumentSelect = "pulsar_*"
    attrSelect {
      topic = "logs"
    }
    filterInstruments {
      dropOnly = ["pulsar_subscription_*", "pulsar_consumer_*"]
    }
  }
}

The configuration is made up of a list of “filtering rules”. Each rule stipulates the following:

  • A name, for documentation purposes.
  • instrumentSelect - ability to select one a more instruments apply this filtering rule for.
    • Example: pulsar_*
  • attrSelect - ability to select a group of attributes to apply this rule on, within the selected instruments.
    • Example topic=receipt-us-*
  • filterInstruments - For each instrument matched, we allow to set whether we wish to drop the attributes selected for certain instruments or keep them.
    • Either:
      • dropOnly - list the instruments we wish to drop selected attributes for. Example: if instrumentSelect is pulsar_* , attrSelect is topic="incoming-logs" and drop is pulsar_subscription_* and pulsar_consumer_* this means all messaging instruments in the topic level will remain and subscription and consumer level metrics will be dropped, only for “incoming-logs” topic.
      • keepOnly - The opposite of drop.
      • dropAll
      • keepAll

Order matter for the list of filtering rules. This allows us to set a default which applies to a wide range of instruments and then override it for certain instruments.

We will supply a default filtering rules configuration which should make sense.

In certain cases the number of rules can reach to 10k or 20k. Since each rule is essentially a regular expression, we will need to cache the resolve of (instrument, attribute) to true/false. For 10k rules, meaning 10k regular expression this might be too much, so we can use aho-corasik algorithm to match only on select few regular expressions.

We have proposed an issue in OpenTelemetry Java SDK to have a push-down predicate, allowing us to do the filtering while iterating over instruments and their attributes. It means OTel SDK will not allocate a datapoints objects if a certain (instrument, attributes) pair is filtered out. See section above on what we wish to fix in OTel to see issue link.

There will be a sub-PIP detailing the exact configuration syntax, storage, how it will be made dynamic, plugin details and more. We will to take into account user experience defining and updating that configuration, especially when it will reach a large size. Also, we want the experience of adding a rule to be smooth, so the CLI should offer auto-complete of metric names if possible and optionally to have a UI dedicated for it. We will also try to protect and validate the filtering rules such that we can reject updating it if the data points amount will exceed a certain threshold - this is to protect against accidental mistake in the configuration (we will accept it if a force parameter will be supplied).

Which summaries / histograms are effected?

As mentioned in the design, summaries will be converted into histograms, and those and existing histograms will be modified to be at the granularity level of namespace.

Summaries

Prometheus Client

  • LedgerOffloaderStatsImpl
    • brk_ledgeroffloader_read_offload_index_latency
    • brk_ledgeroffloader_read_offload_data_latency
    • brk_ledgeroffloader_read_ledger_latency
    • We need to change to be reported per namespace, not per topic
  • ResourceGroupService
    • pulsar_resource_group_aggregate_usage_secs
      • Time required to aggregate usage of all resource groups, in seconds.
    • pulsar_resource_group_calculate_quota_secs
      • Time required to calculate quota of all resource groups, in seconds
    • Not high cardinality, no need to modify aggregation level.
  • ReplicatedSubscriptionsSnapshotBuilder
    • pulsar_replicated_subscriptions_snapshot_ms
      • Time taken to create a consistent snapshot across clusters
      • Can be NS level
  • SchemaRegistryStats
    • pulsar_schema_del_ops_latency
    • pulsar_schema_get_ops_latency
    • pulsar_schema_put_ops_latency
  • BrokerOperabilityMetrics
    • topic_load_times
      • in milliseconds
  • TransactionBufferClientStatsImpl
    • pulsar_txn_tb_client_abort_latency
    • pulsar_txn_tb_client_commit_latency
    • Change it to be NS level, and not topic level.
  • PendingAckHandleStatsImpl
    • pulsar_txn_tp_commit_latency
  • ContextImpl
    • pulsar_function_user_metric_*
      • not high cardinality, and each process runs only one function , so we can use histograms freely.
  • FunctionStatsManager
    • pulsar_function_``process_latency_ms
    • pulsar_function_``process_latency_ms_1min
      • we’ll remove the 1min ones
      • not high cardinality, and each process runs only one function , so we can use histograms freely
  • WorkerStatsManager
    • pulsar_function_worker_start_up_time_ms
      • Shouldn’t be a summary in the first place as it is init once
    • schedule_execution_time_total_ms
    • 6 more of those

Our Summary

  • ModularLoadManagerImpl
    • pulsar_broker_load_manager_bundle_assigment
    • pulsar_broker_lookup
  • AbstractTopic
    • pulsar_broker_publish_latency
      • broker level

OpsStatLogger (uses DataSketches)

  • PulsarZooKeeperClient
    • one instance per action running against ZK
  • Bookkeeper client metrics
    • About 10 operation latencies

Histograms

StatsBucket (Our version of Explicit Bucket Histogram)

  • ManagedLedgerMBeanImpl
    • pulsar_storage_write_latency_le_*
    • pulsar_storage_ledger_write_latency_le_*
    • pulsar_entry_size_le_*
    • all above are topic level
  • CompactionRecord
    • pulsar_compaction_latency_*
      • topic level
  • TransactionMetadataStoreStats
    • pulsar_txn_execution_latency_le_
      • labels: cluster, coordinatorId

Fixing Grafana Dashboards in repository

Pulsar repository contains multiple dashboards. We will create the same dashboards using new names alongside existing ones. We’ll add dashboards for different granularity levels: namespace, group and zoom in on specific group/topic. The dashboards will look the same as existing, since the main changes are done to the query of each panel since the metric name has changed, not the semantics.

This will be specified in a sub-PIP.

Backward Compatibility

Breaking changes

All changes reported here are applicable to the newly added OTel metrics layer. At first, as mentioned in the document, we’ll be able to use both existing metrics system and OTel metric system - you can toggle each. Once everything will be stabilized, we’ll deprecate the current metric system.

  • Names
    • Attribute names will use OTel semantic conventions as much as possible and also Attribute Naming guide.
    • Instrument names will follow guidelines mentioned in OTel Metrics Semantic Conventions
    • Histograms will not encode the bucket range in the instrument name
    • Each domain will have a proper prefix in the instrument name. Biggest example is messaging related metrics which today are prefixed with pulsar_ but should be pulsar_messaging_.
  • Summary metrics are changed to be histogram metrics
  • Histograms are changed from topic level to namespace level.
  • The following configuration flags will be deprecated and eventually removed: exposeConsumerLevelMetricsInPrometheus, exposeTopicLevelMetricsInPrometheus and exposeProducerLevelMetricsInPrometheus.
  • Counters / Histograms / Gauges will no longer be reset, but will be cumulative (the user will have the option to modify it to delta temporality per their needs for backends which supports it via OpenTelemetry views).
  • topic will not contain partition number, but it will be specified via partitionNum attribute.
  • Configuration splitTopicAndPartitionLabelInPrometheus will be deprecated and eventually removed.
  • Most Pulsar plugins will be modified to allow reporting metrics to OTel via special object Pulsar will supply. Any plugin not using it, will not have its metrics reported to OTel.
  • At phase 1 we won’t support Process Runtime in OTel metrics, but only Thread and Kubernetes. If the community will ask for it and discussion yield it as must, we’ll add it.
  • All _1min metrics are removed from Pulsar Function metrics
  • resetMetrics RPC operation in processes running Pulsar Function will be removed
  • User defined metrics in Pulsar Functions will be reported as 0-bucket histogram (offering count and sum), instead of Summary.

API Changes

The sub-PIPs will specify the exact API changes relevant to their constrained scope, since it will be much easier to review as such.

Security

The sub-PIPs will specify the exact security concerns relevant to their constrained scope, since it will be much easier to review as such.

Links