Fix missing metrics for topics by registration of existing collector (#600)

diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index d44522d..6902ca9 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -282,39 +282,186 @@
 		}),
 	}
 
-	prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
-	prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
-	prometheus.DefaultRegisterer.Register(metrics.messagesPending)
-	prometheus.DefaultRegisterer.Register(metrics.bytesPending)
-	prometheus.DefaultRegisterer.Register(metrics.publishErrors)
-	prometheus.DefaultRegisterer.Register(metrics.publishLatency)
-	prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
-
-	prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
-	prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
-	prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
-	prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
-	prometheus.DefaultRegisterer.Register(metrics.acksCounter)
-	prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
-	prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
-	prometheus.DefaultRegisterer.Register(metrics.processingTime)
-
-	prometheus.DefaultRegisterer.Register(metrics.producersOpened)
-	prometheus.DefaultRegisterer.Register(metrics.producersClosed)
-	prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
-	prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
-	prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
-	prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
-	prometheus.DefaultRegisterer.Register(metrics.readersOpened)
-	prometheus.DefaultRegisterer.Register(metrics.readersClosed)
-
-	prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
-	prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
-	prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
-	prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
-	prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
-	prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
-	prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+	err := prometheus.DefaultRegisterer.Register(metrics.messagesPublished)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.messagesPublished = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.bytesPublished)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.bytesPublished = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.messagesPending)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.messagesPending = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.bytesPending)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.bytesPending = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.publishErrors)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.publishErrors = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.publishLatency)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.publishLatency = are.ExistingCollector.(*prometheus.HistogramVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.publishRPCLatency)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.publishRPCLatency = are.ExistingCollector.(*prometheus.HistogramVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.messagesReceived)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.messagesReceived = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.bytesReceived)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.bytesReceived = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.prefetchedMessages)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.prefetchedMessages = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.prefetchedBytes)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.prefetchedBytes = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.acksCounter)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.acksCounter = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.nacksCounter)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.nacksCounter = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.dlqCounter)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.dlqCounter = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.processingTime)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.processingTime = are.ExistingCollector.(*prometheus.HistogramVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersOpened)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersClosed)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.producersPartitions)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.producersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersOpened)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersClosed)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.consumersPartitions)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.consumersPartitions = are.ExistingCollector.(*prometheus.GaugeVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.readersOpened)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.readersOpened = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.readersClosed)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.readersClosed = are.ExistingCollector.(*prometheus.CounterVec)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsOpened)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.ConnectionsOpened = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsClosed)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.ConnectionsClosed = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.ConnectionsEstablishmentErrors = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.ConnectionsHandshakeErrors = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.LookupRequestsCount = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.PartitionedTopicMetadataRequestsCount = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
+	err = prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
+	if err != nil {
+		if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+			metrics.RPCRequestCount = are.ExistingCollector.(prometheus.Counter)
+		}
+	}
 	return metrics
 }