Fix producer deadlock after write failure (#378)

### Motivation

There is a deadlock that can happen in Go client when the client has a write failure and tries to process that.

The issue is that Go mutexes are not re-entrant and we trigger a connection.Close() while already holding the connection mutex.

```
goroutine 1077 [semacquire, 83 minutes]:
sync.runtime_SemacquireMutex(0xc00c31fb04, 0xc110a12000, 0x1)
	/usr/local/go/src/runtime/sema.go:71 +0x47
sync.(*Mutex).lockSlow(0xc00c31fb00)
	/usr/local/go/src/sync/mutex.go:138 +0xfc
sync.(*Mutex).Lock(...)
	/usr/local/go/src/sync/mutex.go:81
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).Close(0xc00c31fb00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:718 +0x547
github.com/apache/pulsar-client-go/pulsar.(*partitionProducer).ReceivedSendReceipt(0xc0033926e0, 0xc09ba0fe00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/producer_partition.go:475 +0x6f0
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).handleSendReceipt(0xc00c31fb00, 0xc09ba0fe00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:588 +0xee
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).internalReceivedCommand(0xc00c31fb00, 0xc00e40e8c0, 0x0, 0x0)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:507 +0x1ce
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).run(0xc00c31fb00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:368 +0x2db
github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start.func1(0xc00c31fb00)
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:230 +0x71
created by github.com/apache/pulsar-client-go/pulsar/internal.(*connection).start
	/go/pkg/mod/cd.splunkdev.com/streamlio/pulsar-client-go@v0.1.1-streamlio-5/pulsar/internal/connection.go:226 +0x3f

```

### Modifications

We don't need to hold the connection lock while the producer is processing the write failure. Releasing the lock earlier is fixing the problem.
1 file changed
tree: 1cd74e37fcc15a9933f01b355a952bbd13700d24
  1. .asf.yaml
  2. .github/
  3. .gitignore
  4. .golangci.yml
  5. .header
  6. CHANGELOG.md
  7. CONTRIBUTING.md
  8. Dockerfile
  9. LICENSE
  10. NOTICE
  11. README.md
  12. VERSION
  13. distribution/
  14. docker-ci.sh
  15. docs/
  16. examples/
  17. go.mod
  18. go.sum
  19. integration-tests/
  20. oauth2/
  21. perf/
  22. pulsar-test-service-start.sh
  23. pulsar-test-service-stop.sh
  24. pulsar/
  25. run-ci.sh
  26. stable.txt
README.md

GoDoc Go Report Card Language LICENSE

Apache Pulsar Go Client Library

A Go client library for the Apache Pulsar project.

Goal

This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.

Once feature parity and stability are reached, this will supersede the current CGo based library.

Requirements

  • Go 1.11+

Status

Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.

Usage

Import the client library:

import "github.com/apache/pulsar-client-go/pulsar"

Create a Producer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

producer, err := client.CreateProducer(pulsar.ProducerOptions{
	Topic: "my-topic",
})

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
	Payload: []byte("hello"),
})

defer producer.Close()

if err != nil {
    fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

Create a Consumer:

client, err := pulsar.NewClient(pulsar.ClientOptions{
    URL: "pulsar://localhost:6650",
})

defer client.Close()

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        Topic:            "my-topic",
        SubscriptionName: "my-sub",
        Type:             pulsar.Shared,
    })

defer consumer.Close()

msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }

fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
            msg.ID(), string(msg.Payload()))

Create a Reader:

client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
	log.Fatal(err)
}

defer client.Close()

reader, err := client.CreateReader(pulsar.ReaderOptions{
	Topic:          "topic-1",
	StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
	log.Fatal(err)
}
defer reader.Close()

for reader.HasNext() {
	msg, err := reader.Next(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
		msg.ID(), string(msg.Payload()))
}

Contributing

Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.

Contact

Mailing lists
NameScope
users@pulsar.apache.orgUser-related discussionsSubscribeUnsubscribeArchives
dev@pulsar.apache.orgDevelopment-related discussionsSubscribeUnsubscribeArchives
Slack

Pulsar slack channel #dev-go at https://apache-pulsar.slack.com/

You can self-register at https://apache-pulsar.herokuapp.com/

License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0