commit | a5f06793bd4365012641a79825267a51d4ce43c9 | [log] [tgz] |
---|---|---|
author | Zike Yang <zike@apache.org> | Wed Feb 28 18:39:00 2024 +0800 |
committer | Zike Yang <zike@apache.org> | Thu Feb 29 10:10:49 2024 +0800 |
tree | 38b145ad698edf070d5205d7f45145aa0138c5fa | |
parent | 9fdefe2bbd2ecdd7c9f853580e829cd66f22b78e [diff] |
[fix] Fix Infinite Loop in Reader's `HasNext` Function (#1182) Fixes #1171 ### Motivation If `getLastMessageId` continually fails, the reader.HasNext can get stuck in an infinite loop. Without any backoff, the reader would keep trying forever. ### Modifications - Implemented a backoff policy for `getLastMessageID`. - If HasNext fails, it now returns false. #### Should the reader.HasNext returned `false` in case of failure? Currently, the `HasNext` method doesn't report errors. However, failure is still possible. For instance, if `getLastMessageID` repeatedly fails and hits the retry limit. An option is to keep trying forever, but this would stall all user code. This isn't user-friendly, so I rejected this solution. #### Couldn't utilize the BackOffPolicy in the Reader Options The `HasNext` retry mechanism requires to use of `IsMaxBackoffReached` for the backoff. But it isn't exposed in the `BackOffPolicy` interface. Introducing a new method to the `BackOffPolicy` would introduce breaking changes for the user backoff implementation. So, I choose not to implement it. Before we do it, we need to refine the BackOffPolicy. (cherry picked from commit 88a8d85cf6d6a4f282a5b39a2140a7bb06ba0f3b)
A Go client library for Apache Pulsar. For the supported Pulsar features, see Client Feature Matrix.
This project is a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo-based library.
Note:
While this library should work with Golang versions as early as 1.16, any bugs specific to versions earlier than 1.18 may not be fixed.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
} else {
fmt.Println("Published message")
}
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Build the sources:
make build
Run the tests:
make test
Run the tests with specific versions of GOLANG and PULSAR:
make test GOLANG_VERSION=1.18 PULSAR_VERSION=2.10.0
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
If your contribution adds Pulsar features for Go clients, you need to update both the Pulsar docs and the Client Feature Matrix. See Contribution Guide for more details.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0
If you've upgraded from a previous version of this library, you may run into an ‘ambiguous import’ error when building.
github.com/apache/pulsar-client-go/oauth2: ambiguous import: found package github.com/apache/pulsar-client-go/oauth2 in multiple modules
The fix for this is to make sure you don't have any references in your go.mod
file to the old oauth2 module path. So remove any lines similar to the following, and then run go mod tidy
.
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20220630195735-e95cf0633348 // indirect