Current support for metric instrumentation in Pulsar client is very limited and poses a lot of issues for integrating the metrics into any telemetry system.
We have 2 ways that metrics are exposed today:
producer.getStats()
or consumer.getStats()
: Calling these methods will get access to the rate of events in the last 1-minute interval. This is problematic because out of the box the metrics are not collected anywhere. One would have to start its own thread to periodically check these values and export them to some other system.Neither of these mechanism that we have today are sufficient to enable application to easily export the telemetry data of Pulsar client SDK.
Provide a good way for applications to retrieve and analyze the usage of Pulsar client operation, in particular with respect to:
OpenTelemetry is quickly becoming the de-facto standard API for metric and tracing instrumentation. In fact, as part of PIP-264, we are already migrating the Pulsar server side metrics to use OpenTelemetry.
For Pulsar client SDK, we need to provide a similar way for application builder to quickly integrate and export Pulsar metrics.
When deciding how to expose the metrics exporter configuration there are multiple options:
OpenTelemetry
object directly in Pulsar APIFor this proposal, we are following the (1) option. Here are the reasons:
When building a PulsarClient
instance, it will be possible to pass an OpenTelemetry
object:
interface ClientBuilder { // ... ClientBuilder openTelemetry(io.opentelemetry.api.OpenTelemetry openTelemetry); }
The common usage for an application would be something like:
// Creates a OpenTelemetry instance using environment variables to configure it OpenTelemetry otel = AutoConfiguredOpenTelemetrySdk.builder().build() .getOpenTelemetrySdk(); PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .openTelemetry(otel) .build(); // ....
Even without passing the OpenTelemetry
instance to Pulsar client SDK, an application using the OpenTelemetry agent, will be able to instrument the Pulsar client automatically, because we default to use GlobalOpenTelemetry.get()
.
The old way of collecting stats will be deprecated in phases:
Methods to deprecate:
interface ClientBuilder { // ... @Deprecated ClientBuilder statsInterval(long statsInterval, TimeUnit unit); } interface Producer { @Deprecated ProducerStats getStats(); } interface Consumer { @Deprecated ConsumerStats getStats(); }
Based on the experience of Pulsar Go client SDK metrics ( see: https://github.com/apache/pulsar-client-go/blob/master/pulsar/internal/metrics.go), this is the proposed initial set of metrics to export.
Additional metrics could be added later on, though it's better to start with the set of most important metrics and then evaluate any missing information.
These metrics names and attributes will be considered “Experimental” for 3.3 release and might be subject to changes. The plan is to finalize all the namings in 4.0 LTS release.
Attributes with [name]
brackets will not be included by default, to avoid high cardinality metrics.
OTel metric name | Type | Unit | Attributes | Description |
---|---|---|---|---|
pulsar.client.connection.opened | Counter | connections | The number of connections opened | |
pulsar.client.connection.closed | Counter | connections | The number of connections closed | |
pulsar.client.connection.failed | Counter | connections | The number of failed connection attempts | |
pulsar.client.producer.opened | Counter | sessions | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ] | The number of producer sessions opened |
pulsar.client.producer.closed | Counter | sessions | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ] | The number of producer sessions closed |
pulsar.client.consumer.opened | Counter | sessions | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of consumer sessions opened |
pulsar.client.consumer.closed | Counter | sessions | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of consumer sessions closed |
pulsar.client.consumer.message.received.count | Counter | messages | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of messages explicitly received by the consumer application |
pulsar.client.consumer.message.received.size | Counter | bytes | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of bytes explicitly received by the consumer application |
pulsar.client.consumer.receive_queue.count | UpDownCounter | messages | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of messages currently sitting in the consumer receive queue |
pulsar.client.consumer.receive_queue.size | UpDownCounter | bytes | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The total size in bytes of messages currently sitting in the consumer receive queue |
pulsar.client.consumer.message.ack | Counter | messages | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of acknowledged messages |
pulsar.client.consumer.message.nack | Counter | messages | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of negatively acknowledged messages |
pulsar.client.consumer.message.dlq | Counter | messages | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of messages sent to DLQ |
pulsar.client.consumer.message.ack.timeout | Counter | messages | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.subscription | The number of messages that were not acknowledged in the configured timeout period, hence, were requested by the client to be redelivered |
pulsar.client.producer.message.send.duration | Histogram | seconds | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ] | Publish latency experienced by the application, includes client batching time |
pulsar.client.producer.rpc.send.duration | Histogram | seconds | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.response.status="success|failed" | Publish RPC latency experienced internally by the client when sending data to receiving an ack |
pulsar.client.producer.message.send.size | Counter | bytes | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ], pulsar.response.status="success|failed" | The number of bytes published |
pulsar.client.producer.message.pending.count" | UpDownCounter | messages | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ] | The number of messages in the producer internal send queue, waiting to be sent |
pulsar.client.producer.message.pending.size | UpDownCounter | bytes | pulsar.tenant , pulsar.namespace , [pulsar.topic ], [pulsar.partition ] | The size of the messages in the producer internal queue, waiting to sent |
pulsar.client.lookup.duration | Histogram | seconds | pulsar.lookup.transport-type="binary|http" , pulsar.lookup.type="topic|metadata|schema|list-topics" , pulsar.response.status="success|failed" | Duration of different types of client lookup operations |
The metrics data point will be tagged with these attributes:
pulsar.tenant
pulsar.namespace
pulsar.topic
pulsar.partition
By default the metrics will be exported with tenant and namespace attributes set. If an application wants to enable a finer level, with higher cardinality, it can do so by using OpenTelemetry configuration.