commit | bd19458b32ff89206c135cc647336e690e99c32f | [log] [tgz] |
---|---|---|
author | ming <itestmycode@gmail.com> | Fri Jul 29 01:00:33 2022 -0400 |
committer | GitHub <noreply@github.com> | Fri Jul 29 00:00:33 2022 -0500 |
tree | 73592d86d42291a093a91badc23712885eb0341e | |
parent | 6a8e7f39aac100a285a2c190186e38b73a5c9d34 [diff] |
[issue 814] consumer and producer reconnect failure metrics counter (#815) * consumer and producer reconnect failure metrics counter * increment on every reconnect failure * producer consumer reconnect max retry counter Implement #814 ### Motivation In a Pulsar cluster's kubernetes deployment or a deployment with Proxy/LB in the front, we need metrics counter to track the re-connection failure producers and consumers. When brokers go offline but the proxy/LB is still functioning, TCP connection can still be established but the topic look up failed. pulsar_client_connections_establishment_errors counter is not incremented in this case. Therefore new counters are required to track such failure cases. ### Modifications Two new counter metrics `pulsar_client_producers_reconnect_failure` and `pulsar_client_consumers_reconnect_failure` will be incremented at the producer_partition and consumer_partition retry failure code block. Two new counter metrics `pulsar_client_producers_reconnect_max_retry` and `pulsar_client_consumers_reconnect_max_retry` will be incremented at the producer_partition and consumer_partition when either the max retry or max back off is reached. The existing code logic already covers the case when the topic does not exist. The counters will not be pegged if the topic does not exist. It simply exists from the retry loop at once. ### Verifying this change This has been verified in the Pulsar cluster deployment with Proxy. We do not have such set up in CI because it's not possible to test with Pulsar standalone mode. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): ( no) - The public API: ( no) - The schema: (no) - The default values of configurations: ( no) - The wire protocol: (no) ### Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
A Go client library for the Apache Pulsar project.
This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo based library.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
} else {
fmt.Println("Published message")
}
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Build the sources:
go build ./pulsar
Run the unit tests:
./docker-ci.sh
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0