diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index fac9d4b..50fa3c4 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1155,6 +1155,10 @@
 		if maxRetry > 0 {
 			maxRetry--
 		}
+		pc.metrics.ConsumersReconnectFailure.Inc()
+		if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+			pc.metrics.ConsumersReconnectMaxRetry.Inc()
+		}
 	}
 }
 
diff --git a/pulsar/internal/backoff.go b/pulsar/internal/backoff.go
index f172bdc..ff9b0bc 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/internal/backoff.go
@@ -51,3 +51,8 @@
 
 	return b.backoff + time.Duration(jitter)
 }
+
+// IsMaxBackReached evaluates if the max number of retries is reached
+func (b *Backoff) IsMaxBackoffReached() bool {
+	return b.backoff >= maxBackoff
+}
diff --git a/pulsar/internal/backoff_test.go b/pulsar/internal/backoff_test.go
index 46c9211..ad6e764 100644
--- a/pulsar/internal/backoff_test.go
+++ b/pulsar/internal/backoff_test.go
@@ -42,6 +42,7 @@
 		// the jitter introduces at most 20% difference so delay is less than twice the previous value
 		assert.LessOrEqual(t, int64(float64(delay)*.8), int64(2*float64(previousDelay)))
 		previousDelay = delay
+		assert.Equal(t, false, backoff.IsMaxBackoffReached())
 	}
 }
 
@@ -54,6 +55,7 @@
 
 	cappedDelay := backoff.Next()
 	assert.GreaterOrEqual(t, int64(cappedDelay), int64(maxBackoff))
+	assert.Equal(t, true, backoff.IsMaxBackoffReached())
 	// max value is 60 seconds + 20% jitter = 72 seconds
 	assert.LessOrEqual(t, int64(cappedDelay), int64(72*time.Second))
 }
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index 1cab470..a2af33c 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -40,14 +40,18 @@
 	dlqCounter         *prometheus.CounterVec
 	processingTime     *prometheus.HistogramVec
 
-	producersOpened     *prometheus.CounterVec
-	producersClosed     *prometheus.CounterVec
-	producersPartitions *prometheus.GaugeVec
-	consumersOpened     *prometheus.CounterVec
-	consumersClosed     *prometheus.CounterVec
-	consumersPartitions *prometheus.GaugeVec
-	readersOpened       *prometheus.CounterVec
-	readersClosed       *prometheus.CounterVec
+	producersOpened            *prometheus.CounterVec
+	producersClosed            *prometheus.CounterVec
+	producersReconnectFailure  *prometheus.CounterVec
+	producersReconnectMaxRetry *prometheus.CounterVec
+	producersPartitions        *prometheus.GaugeVec
+	consumersOpened            *prometheus.CounterVec
+	consumersClosed            *prometheus.CounterVec
+	consumersReconnectFailure  *prometheus.CounterVec
+	consumersReconnectMaxRetry *prometheus.CounterVec
+	consumersPartitions        *prometheus.GaugeVec
+	readersOpened              *prometheus.CounterVec
+	readersClosed              *prometheus.CounterVec
 
 	// Metrics that are not labeled with specificity are immediately available
 	ConnectionsOpened                     prometheus.Counter
@@ -78,14 +82,18 @@
 	DlqCounter         prometheus.Counter
 	ProcessingTime     prometheus.Observer
 
-	ProducersOpened     prometheus.Counter
-	ProducersClosed     prometheus.Counter
-	ProducersPartitions prometheus.Gauge
-	ConsumersOpened     prometheus.Counter
-	ConsumersClosed     prometheus.Counter
-	ConsumersPartitions prometheus.Gauge
-	ReadersOpened       prometheus.Counter
-	ReadersClosed       prometheus.Counter
+	ProducersOpened            prometheus.Counter
+	ProducersClosed            prometheus.Counter
+	ProducersReconnectFailure  prometheus.Counter
+	ProducersReconnectMaxRetry prometheus.Counter
+	ProducersPartitions        prometheus.Gauge
+	ConsumersOpened            prometheus.Counter
+	ConsumersClosed            prometheus.Counter
+	ConsumersReconnectFailure  prometheus.Counter
+	ConsumersReconnectMaxRetry prometheus.Counter
+	ConsumersPartitions        prometheus.Gauge
+	ReadersOpened              prometheus.Counter
+	ReadersClosed              prometheus.Counter
 }
 
 func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string) *Metrics {
@@ -175,6 +183,18 @@
 			ConstLabels: constLabels,
 		}, metricsLevelLabels),
 
+		producersReconnectFailure: prometheus.NewCounterVec(prometheus.CounterOpts{
+			Name:        "pulsar_client_producers_reconnect_failure",
+			Help:        "Counter of reconnect failure of producers",
+			ConstLabels: constLabels,
+		}, metricsLevelLabels),
+
+		producersReconnectMaxRetry: prometheus.NewCounterVec(prometheus.CounterOpts{
+			Name:        "pulsar_client_producers_reconnect_max_retry",
+			Help:        "Counter of producer reconnect max retry reached",
+			ConstLabels: constLabels,
+		}, metricsLevelLabels),
+
 		consumersOpened: prometheus.NewCounterVec(prometheus.CounterOpts{
 			Name:        "pulsar_client_consumers_opened",
 			Help:        "Counter of consumers created by the client",
@@ -187,6 +207,18 @@
 			ConstLabels: constLabels,
 		}, metricsLevelLabels),
 
+		consumersReconnectFailure: prometheus.NewCounterVec(prometheus.CounterOpts{
+			Name:        "pulsar_client_consumers_reconnect_failure",
+			Help:        "Counter of reconnect failure of consumers",
+			ConstLabels: constLabels,
+		}, metricsLevelLabels),
+
+		consumersReconnectMaxRetry: prometheus.NewCounterVec(prometheus.CounterOpts{
+			Name:        "pulsar_client_consumers_reconnect_max_retry",
+			Help:        "Counter of consumer reconnect max retry reached",
+			ConstLabels: constLabels,
+		}, metricsLevelLabels),
+
 		consumersPartitions: prometheus.NewGaugeVec(prometheus.GaugeOpts{
 			Name:        "pulsar_client_consumers_partitions_active",
 			Help:        "Counter of individual partitions the consumers are currently active",
@@ -399,6 +431,18 @@
 			metrics.producersClosed = are.ExistingCollector.(*prometheus.CounterVec)
 		}
 	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersReconnectFailure)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersReconnectFailure = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersReconnectMaxRetry)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersReconnectMaxRetry = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
 	err = prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
 	if err != nil {
 		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
@@ -417,6 +461,18 @@
 			metrics.consumersClosed = are.ExistingCollector.(*prometheus.CounterVec)
 		}
 	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersReconnectFailure)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersReconnectFailure = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersReconnectMaxRetry)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersReconnectMaxRetry = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
 	err = prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
 	if err != nil {
 		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
@@ -517,14 +573,18 @@
 		DlqCounter:         mp.dlqCounter.With(labels),
 		ProcessingTime:     mp.processingTime.With(labels),
 
-		ProducersOpened:     mp.producersOpened.With(labels),
-		ProducersClosed:     mp.producersClosed.With(labels),
-		ProducersPartitions: mp.producersPartitions.With(labels),
-		ConsumersOpened:     mp.consumersOpened.With(labels),
-		ConsumersClosed:     mp.consumersClosed.With(labels),
-		ConsumersPartitions: mp.consumersPartitions.With(labels),
-		ReadersOpened:       mp.readersOpened.With(labels),
-		ReadersClosed:       mp.readersClosed.With(labels),
+		ProducersOpened:            mp.producersOpened.With(labels),
+		ProducersClosed:            mp.producersClosed.With(labels),
+		ProducersReconnectFailure:  mp.producersReconnectFailure.With(labels),
+		ProducersReconnectMaxRetry: mp.producersReconnectMaxRetry.With(labels),
+		ProducersPartitions:        mp.producersPartitions.With(labels),
+		ConsumersOpened:            mp.consumersOpened.With(labels),
+		ConsumersClosed:            mp.consumersClosed.With(labels),
+		ConsumersReconnectFailure:  mp.consumersReconnectFailure.With(labels),
+		ConsumersReconnectMaxRetry: mp.consumersReconnectMaxRetry.With(labels),
+		ConsumersPartitions:        mp.consumersPartitions.With(labels),
+		ReadersOpened:              mp.readersOpened.With(labels),
+		ReadersClosed:              mp.readersClosed.With(labels),
 	}
 
 	return lm
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index ec92415..210a929 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -419,6 +419,10 @@
 		if maxRetry > 0 {
 			maxRetry--
 		}
+		p.metrics.ProducersReconnectFailure.Inc()
+		if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+			p.metrics.ProducersReconnectMaxRetry.Inc()
+		}
 	}
 }
 
