[issue 814] consumer and producer reconnect failure metrics counter (#815)

* consumer and producer reconnect failure metrics counter

* increment on every reconnect failure

* producer consumer reconnect max retry counter

Implement #814 

### Motivation
In a Pulsar cluster's kubernetes deployment or a deployment with Proxy/LB in the front, we need metrics counter to track the re-connection failure producers and consumers.

When brokers go offline but the proxy/LB is still functioning, TCP connection can still be established but the topic look up failed. pulsar_client_connections_establishment_errors counter is not incremented in this case.  Therefore new counters are required to track such failure cases.

### Modifications

Two new counter metrics `pulsar_client_producers_reconnect_failure` and `pulsar_client_consumers_reconnect_failure` will be incremented at the producer_partition and consumer_partition retry failure code block.

Two new counter metrics `pulsar_client_producers_reconnect_max_retry` and `pulsar_client_consumers_reconnect_max_retry` will be incremented at the producer_partition and consumer_partition when either the max retry or max back off is reached.

The existing code logic already covers the case when the topic does not exist. The counters will not be pegged if the topic does not exist. It simply exists from the retry loop at once.

### Verifying this change

This has been verified in the Pulsar cluster deployment with Proxy. We do not have such set up in CI because it's not possible to test with Pulsar standalone mode.

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API: ( no)
  - The schema: (no)
  - The default values of configurations: ( no)
  - The wire protocol: (no)

### Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  - If a feature is not applicable for documentation, explain why?
  - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
5 files changed
tree: 73592d86d42291a093a91badc23712885eb0341e
  1. .github/
  2. distribution/
  3. docs/
  4. examples/
  5. integration-tests/
  6. oauth2/
  7. perf/
  8. pulsar/
  9. .asf.yaml
  10. .gitignore
  11. .golangci.yml
  12. .header
  13. CHANGELOG.md
  14. CONTRIBUTING.md
  15. docker-ci.sh
  16. Dockerfile
  17. go.mod
  18. go.sum
  19. LICENSE
  20. NOTICE
  21. pulsar-test-service-start.sh
  22. pulsar-test-service-stop.sh
  23. README.md
  24. run-ci.sh
  25. stable.txt
  26. VERSION
README.md

GoDoc Go Report Card Language LICENSE

Apache Pulsar Go Client Library

A Go client library for the Apache Pulsar project.

Goal

This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.

Once feature parity and stability are reached, this will supersede the current CGo based library.

Requirements

  • Go 1.15+

Status

Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.

Usage

Import the client library:

import "github.com/apache/pulsar-client-go/pulsar"

Create a Producer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
	Topic: "my-topic",
})

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
	Payload: []byte("hello"),
})

defer producer.Close()

if err != nil {
    fmt.Println("Failed to publish message", err)
} else {
    fmt.Println("Published message")
}

Create a Consumer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "my-topic",
        SubscriptionName: "my-sub",
        Type:             pulsar.Shared,
    })

defer consumer.Close()

msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
            msg.ID(), string(msg.Payload()))

Create a Reader:

client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
	log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
	Topic:          "topic-1",
	StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
	log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
	msg, err := reader.Next(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
		msg.ID(), string(msg.Payload()))
}

Build and Test

Build the sources:

go build ./pulsar

Run the unit tests:

./docker-ci.sh

Contributing

Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.

Contact

Mailing lists
NameScope
users@pulsar.apache.orgUser-related discussionsSubscribeUnsubscribeArchives
dev@pulsar.apache.orgDevelopment-related discussionsSubscribeUnsubscribeArchives
Slack

Pulsar slack channel #dev-go at https://apache-pulsar.slack.com/

You can self-register at https://apache-pulsar.herokuapp.com/

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0