blob: e9bc81522caac7050eb029879d36897c61050797 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"github.com/prometheus/client_golang/prometheus"
)
type Metrics struct {
metricsLevel int
messagesPublished *prometheus.CounterVec
bytesPublished *prometheus.CounterVec
messagesPending *prometheus.GaugeVec
bytesPending *prometheus.GaugeVec
publishErrors *prometheus.CounterVec
publishLatency *prometheus.HistogramVec
publishRPCLatency *prometheus.HistogramVec
messagesReceived *prometheus.CounterVec
bytesReceived *prometheus.CounterVec
prefetchedMessages *prometheus.GaugeVec
prefetchedBytes *prometheus.GaugeVec
acksCounter *prometheus.CounterVec
nacksCounter *prometheus.CounterVec
dlqCounter *prometheus.CounterVec
processingTime *prometheus.HistogramVec
producersOpened *prometheus.CounterVec
producersClosed *prometheus.CounterVec
producersReconnectFailure *prometheus.CounterVec
producersReconnectMaxRetry *prometheus.CounterVec
producersPartitions *prometheus.GaugeVec
consumersOpened *prometheus.CounterVec
consumersClosed *prometheus.CounterVec
consumersReconnectFailure *prometheus.CounterVec
consumersReconnectMaxRetry *prometheus.CounterVec
consumersPartitions *prometheus.GaugeVec
readersOpened *prometheus.CounterVec
readersClosed *prometheus.CounterVec
// Metrics that are not labeled with specificity are immediately available
ConnectionsOpened prometheus.Counter
ConnectionsClosed prometheus.Counter
ConnectionsEstablishmentErrors prometheus.Counter
ConnectionsHandshakeErrors prometheus.Counter
LookupRequestsCount prometheus.Counter
PartitionedTopicMetadataRequestsCount prometheus.Counter
RPCRequestCount prometheus.Counter
}
type LeveledMetrics struct {
MessagesPublished prometheus.Counter
BytesPublished prometheus.Counter
MessagesPending prometheus.Gauge
BytesPending prometheus.Gauge
PublishErrorsTimeout prometheus.Counter
PublishErrorsMsgTooLarge prometheus.Counter
PublishLatency prometheus.Observer
PublishRPCLatency prometheus.Observer
MessagesReceived prometheus.Counter
BytesReceived prometheus.Counter
PrefetchedMessages prometheus.Gauge
PrefetchedBytes prometheus.Gauge
AcksCounter prometheus.Counter
NacksCounter prometheus.Counter
DlqCounter prometheus.Counter
ProcessingTime prometheus.Observer
ProducersOpened prometheus.Counter
ProducersClosed prometheus.Counter
ProducersReconnectFailure prometheus.Counter
ProducersReconnectMaxRetry prometheus.Counter
ProducersPartitions prometheus.Gauge
ConsumersOpened prometheus.Counter
ConsumersClosed prometheus.Counter
ConsumersReconnectFailure prometheus.Counter
ConsumersReconnectMaxRetry prometheus.Counter
ConsumersPartitions prometheus.Gauge
ReadersOpened prometheus.Counter
ReadersClosed prometheus.Counter
}
// NewMetricsProvider returns metrics registered to registerer.
func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string,
registerer prometheus.Registerer) *Metrics {
constLabels := map[string]string{
"client": "go",
}
for k, v := range userDefinedLabels {
constLabels[k] = v
}
var metricsLevelLabels []string
// note: ints here mirror MetricsCardinality in client.go to avoid import cycle
switch metricsCardinality {
case 1: //MetricsCardinalityNone
metricsLevelLabels = []string{}
case 2: //MetricsCardinalityTenant
metricsLevelLabels = []string{"pulsar_tenant"}
case 3: //MetricsCardinalityNamespace
metricsLevelLabels = []string{"pulsar_tenant", "pulsar_namespace"}
case 4: //MetricsCardinalityTopic
metricsLevelLabels = []string{"pulsar_tenant", "pulsar_namespace", "topic"}
default: //Anything else is namespace
metricsLevelLabels = []string{"pulsar_tenant", "pulsar_namespace"}
}
metrics := &Metrics{
metricsLevel: metricsCardinality,
messagesPublished: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_messages_published",
Help: "Counter of messages published by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
bytesPublished: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_bytes_published",
Help: "Counter of messages published by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
messagesPending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_producer_pending_messages",
Help: "Counter of messages pending to be published by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
bytesPending: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_producer_pending_bytes",
Help: "Counter of bytes pending to be published by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
publishErrors: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producer_errors",
Help: "Counter of publish errors",
ConstLabels: constLabels,
}, append(metricsLevelLabels, "error")),
publishLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pulsar_client_producer_latency_seconds",
Help: "Publish latency experienced by the client",
ConstLabels: constLabels,
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}, metricsLevelLabels),
publishRPCLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pulsar_client_producer_rpc_latency_seconds",
Help: "Publish RPC latency experienced internally by the client when sending data to receiving an ack",
ConstLabels: constLabels,
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
}, metricsLevelLabels),
producersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_opened",
Help: "Counter of producers created by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
producersClosed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_closed",
Help: "Counter of producers closed by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
producersPartitions: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_producers_partitions_active",
Help: "Counter of individual partitions the producers are currently active",
ConstLabels: constLabels,
}, metricsLevelLabels),
producersReconnectFailure: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_reconnect_failure",
Help: "Counter of reconnect failure of producers",
ConstLabels: constLabels,
}, metricsLevelLabels),
producersReconnectMaxRetry: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_producers_reconnect_max_retry",
Help: "Counter of producer reconnect max retry reached",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_opened",
Help: "Counter of consumers created by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersClosed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_closed",
Help: "Counter of consumers closed by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersReconnectFailure: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_reconnect_failure",
Help: "Counter of reconnect failure of consumers",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersReconnectMaxRetry: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumers_reconnect_max_retry",
Help: "Counter of consumer reconnect max retry reached",
ConstLabels: constLabels,
}, metricsLevelLabels),
consumersPartitions: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_consumers_partitions_active",
Help: "Counter of individual partitions the consumers are currently active",
ConstLabels: constLabels,
}, metricsLevelLabels),
messagesReceived: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_messages_received",
Help: "Counter of messages received by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
bytesReceived: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_bytes_received",
Help: "Counter of bytes received by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
prefetchedMessages: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_consumer_prefetched_messages",
Help: "Number of messages currently sitting in the consumer pre-fetch queue",
ConstLabels: constLabels,
}, metricsLevelLabels),
prefetchedBytes: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "pulsar_client_consumer_prefetched_bytes",
Help: "Total number of bytes currently sitting in the consumer pre-fetch queue",
ConstLabels: constLabels,
}, metricsLevelLabels),
acksCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_acks",
Help: "Counter of messages acked by client",
ConstLabels: constLabels,
}, metricsLevelLabels),
nacksCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_nacks",
Help: "Counter of messages nacked by client",
ConstLabels: constLabels,
}, metricsLevelLabels),
dlqCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_consumer_dlq_messages",
Help: "Counter of messages sent to Dead letter queue",
ConstLabels: constLabels,
}, metricsLevelLabels),
processingTime: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "pulsar_client_consumer_processing_time_seconds",
Help: "Time it takes for application to process messages",
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
ConstLabels: constLabels,
}, metricsLevelLabels),
readersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_readers_opened",
Help: "Counter of readers created by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
readersClosed: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "pulsar_client_readers_closed",
Help: "Counter of readers closed by the client",
ConstLabels: constLabels,
}, metricsLevelLabels),
ConnectionsOpened: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_opened",
Help: "Counter of connections created by the client",
ConstLabels: constLabels,
}),
ConnectionsClosed: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_closed",
Help: "Counter of connections closed by the client",
ConstLabels: constLabels,
}),
ConnectionsEstablishmentErrors: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_establishment_errors",
Help: "Counter of errors in connections establishment",
ConstLabels: constLabels,
}),
ConnectionsHandshakeErrors: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_connections_handshake_errors",
Help: "Counter of errors in connections handshake (eg: authz)",
ConstLabels: constLabels,
}),
LookupRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_lookup_count",
Help: "Counter of lookup requests made by the client",
ConstLabels: constLabels,
}),
PartitionedTopicMetadataRequestsCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_partitioned_topic_metadata_count",
Help: "Counter of partitioned_topic_metadata requests made by the client",
ConstLabels: constLabels,
}),
RPCRequestCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_rpc_count",
Help: "Counter of RPC requests made by the client",
ConstLabels: constLabels,
}),
}
err := registerer.Register(metrics.messagesPublished)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesPublished = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.bytesPublished)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesPublished = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.messagesPending)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesPending = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.bytesPending)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesPending = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.publishErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishErrors = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.publishLatency)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishLatency = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = registerer.Register(metrics.publishRPCLatency)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.publishRPCLatency = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = registerer.Register(metrics.messagesReceived)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.messagesReceived = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.bytesReceived)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.bytesReceived = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.prefetchedMessages)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.prefetchedMessages = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.prefetchedBytes)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.prefetchedBytes = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.acksCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.acksCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.nacksCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.nacksCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.dlqCounter)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.dlqCounter = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.processingTime)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.processingTime = are.ExistingCollector.(*prometheus.HistogramVec)
}
}
err = registerer.Register(metrics.producersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.producersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.producersReconnectFailure)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersReconnectFailure = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.producersReconnectMaxRetry)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersReconnectMaxRetry = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.producersPartitions)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.producersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.consumersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.consumersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.consumersReconnectFailure)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersReconnectFailure = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.consumersReconnectMaxRetry)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersReconnectMaxRetry = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.consumersPartitions)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.consumersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
}
}
err = registerer.Register(metrics.readersOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.readersOpened = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.readersClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.readersClosed = are.ExistingCollector.(*prometheus.CounterVec)
}
}
err = registerer.Register(metrics.ConnectionsOpened)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsOpened = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.ConnectionsClosed)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsClosed = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.ConnectionsEstablishmentErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsEstablishmentErrors = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.ConnectionsHandshakeErrors)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.ConnectionsHandshakeErrors = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.LookupRequestsCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.LookupRequestsCount = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.PartitionedTopicMetadataRequestsCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.PartitionedTopicMetadataRequestsCount = are.ExistingCollector.(prometheus.Counter)
}
}
err = registerer.Register(metrics.RPCRequestCount)
if err != nil {
if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
metrics.RPCRequestCount = are.ExistingCollector.(prometheus.Counter)
}
}
return metrics
}
func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics {
labels := make(map[string]string, 3)
tn, err := ParseTopicName(t)
if err != nil {
return nil
}
topic := TopicNameWithoutPartitionPart(tn)
switch mp.metricsLevel {
case 4:
labels["topic"] = topic
fallthrough
case 3:
labels["pulsar_namespace"] = tn.Namespace
fallthrough
case 2:
labels["pulsar_tenant"] = tn.Tenant
}
lm := &LeveledMetrics{
MessagesPublished: mp.messagesPublished.With(labels),
BytesPublished: mp.bytesPublished.With(labels),
MessagesPending: mp.messagesPending.With(labels),
BytesPending: mp.bytesPending.With(labels),
PublishErrorsTimeout: mp.publishErrors.With(mergeMaps(labels, map[string]string{"error": "timeout"})),
PublishErrorsMsgTooLarge: mp.publishErrors.With(mergeMaps(labels, map[string]string{"error": "msg_too_large"})),
PublishLatency: mp.publishLatency.With(labels),
PublishRPCLatency: mp.publishRPCLatency.With(labels),
MessagesReceived: mp.messagesReceived.With(labels),
BytesReceived: mp.bytesReceived.With(labels),
PrefetchedMessages: mp.prefetchedMessages.With(labels),
PrefetchedBytes: mp.prefetchedBytes.With(labels),
AcksCounter: mp.acksCounter.With(labels),
NacksCounter: mp.nacksCounter.With(labels),
DlqCounter: mp.dlqCounter.With(labels),
ProcessingTime: mp.processingTime.With(labels),
ProducersOpened: mp.producersOpened.With(labels),
ProducersClosed: mp.producersClosed.With(labels),
ProducersReconnectFailure: mp.producersReconnectFailure.With(labels),
ProducersReconnectMaxRetry: mp.producersReconnectMaxRetry.With(labels),
ProducersPartitions: mp.producersPartitions.With(labels),
ConsumersOpened: mp.consumersOpened.With(labels),
ConsumersClosed: mp.consumersClosed.With(labels),
ConsumersReconnectFailure: mp.consumersReconnectFailure.With(labels),
ConsumersReconnectMaxRetry: mp.consumersReconnectMaxRetry.With(labels),
ConsumersPartitions: mp.consumersPartitions.With(labels),
ReadersOpened: mp.readersOpened.With(labels),
ReadersClosed: mp.readersClosed.With(labels),
}
return lm
}
func mergeMaps(a, b map[string]string) map[string]string {
res := make(map[string]string)
for k, v := range a {
res[k] = v
}
for k, v := range b {
res[k] = v
}
return res
}