commit | 8139a2c721d0d9f66727fac16f3705bc254d9017 | [log] [tgz] |
---|---|---|
author | dferstay <dferstay@users.noreply.github.com> | Wed Jun 30 06:49:46 2021 -0700 |
committer | GitHub <noreply@github.com> | Wed Jun 30 21:49:46 2021 +0800 |
tree | 4a38d88b16d442cfdc4e3c78cd6942aae18e0e3c | |
parent | 1a9f356c125e2b15b69aa8c7a5f6707fb9ce3e51 [diff] |
Fix panic() in internal/connection when writing to a closed channel during close (#539) The race is as follows: T1 - calls SendRequestNoWait(), checks the connection state, and prepares to enter the select statement T2 - calls TriggerClose() closes cnx and the closeCh T3 - run() go-routine for processing incomingRequestsCh drops into case <-closeCh: and calls failLeftRequestsWhenClose() which drains and closes incomingRequestsCh T1 - resumes and drops into the select where both closeCh and incomingRequestsCh are closed. When two cases of a `select` are valid, the case executed is chosen at random; see https://tour.golang.org/concurrency/5 This commit introduces a connectionClosing state and a wait group to track writes by the SendRequest() methods. * TriggerClose() moves the connection into the connectionClosing state before the closeCh is closed. * The failLeftRequestsWhenClosed() method waits on the waitgroup for outstanding SendRequest() methods to complete before it closes the incomingRequestsCh * The SendRequest() methods first add to the waitgroup before checking the connection state; if the state is either closing or closed, SendRequest() returns an error. With the above it is not possible for thread to attempt to add a request to the incomingRequestsCh without being tracked by the waitgroup, and the incomingRequestsCh will not be closed until operations tracked by the waitgroup have completed. Signed-off-by: Daniel Ferstay <dferstay@splunk.com> Co-authored-by: Daniel Ferstay <dferstay@splunk.com> Co-authored-by: xiaolongran <xiaolongran@tencent.com>
A Go client library for the Apache Pulsar project.
This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo based library.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0