commit | 0296890a9136a0921da19442cf713a4ebb31f9b0 | [log] [tgz] |
---|---|---|
author | Matteo Merli <mmerli@apache.org> | Sat Dec 12 00:02:17 2020 -0800 |
committer | GitHub <noreply@github.com> | Sat Dec 12 16:02:17 2020 +0800 |
tree | bc02051ad0c05a037bd075c2b85bc6f1efd8ce23 | |
parent | 110b99d1c88f3d34e9edbe40d957609421d35c25 [diff] |
Fixed logic to attempt reconnections to same broker (#414) ### Motivation There is a problem with the re-connection logic introduced in #157. The change added a logic to keep retrying to establish a TCP connection with broker up to the "operation timeout" (default 30seconds). There are few issues with it: 1. (minor) It's not checking that the error is indeed a TCP error (eg: it would retry on auth failures too) 2. (major) After a TCP connection failure, reconnecting to the same broker is always the wrong approach, because the most likely outcome is that the next attempt will also fail and, worse, the IP might just be unresponsive and we will then have to wait for the full connection timeout time. The correct solution after a connection failure is to re-do the topic lookup, since the topic will be moving to a different broker and we need to reconnect to the new broker asap. The only time we can do this connection retry logic is for requests that are not specific to a particular broker (eg: lookup operations). In this case a quick retry on a connection failure will probably land the request on a different, healthy, broker.
A Go client library for the Apache Pulsar project.
This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo based library.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0