commit | 02b244e1501cf36f5f4bf232deeef65eb9a651ff | [log] [tgz] |
---|---|---|
author | Matteo Merli <mmerli@apache.org> | Fri Oct 09 09:14:38 2020 -0700 |
committer | GitHub <noreply@github.com> | Sat Oct 10 00:14:38 2020 +0800 |
tree | 1cd74e37fcc15a9933f01b355a952bbd13700d24 | |
parent | c03f45fe8191cffbad9f1f658b06bba8e8ddbd4b [diff] |
Fix producer deadlock after write failure (#378) ### Motivation There is a deadlock that can happen in Go client when the client has a write failure and tries to process that. The issue is that Go mutexes are not re-entrant and we trigger a connection.Close() while already holding the connection mutex. ``` goroutine 1077 [semacquire, 83 minutes]: sync.runtime_SemacquireMutex(0xc00c31fb04, 0xc110a12000, 0x1) /usr/local/go/src/runtime/sema.go:71 +0x47 sync.(*Mutex).lockSlow(0xc00c31fb00) /usr/local/go/src/sync/mutex.go:138 +0xfc sync.(*Mutex).Lock(...) /usr/local/go/src/sync/mutex.go:81 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close(0xc00c31fb00) /go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:718 +0x547 github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt(0xc0033926e0, 0xc09ba0fe00) /go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/producer_partition.go:475 +0x6f0 github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt(0xc00c31fb00, 0xc09ba0fe00) /go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:588 +0xee github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc00c31fb00, 0xc00e40e8c0, 0x0, 0x0) /go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:507 +0x1ce github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc00c31fb00) /go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:368 +0x2db github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc00c31fb00) /go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:230 +0x71 created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start /go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:226 +0x3f ``` ### Modifications We don't need to hold the connection lock while the producer is processing the write failure. Releasing the lock earlier is fixing the problem.
A Go client library for the Apache Pulsar project.
This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo based library.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0