commit | 96ce2de573ea8ad14173f41d6e48b6b48a8b0aa0 | [log] [tgz] |
---|---|---|
author | dferstay <dferstay@users.noreply.github.com> | Thu Jun 24 02:27:12 2021 -0700 |
committer | GitHub <noreply@github.com> | Thu Jun 24 11:27:12 2021 +0200 |
tree | 7840420bc96b77179b82c166869daf40b3d6b71d | |
parent | f33b594034c9433faae78a144983ab3c8f785add [diff] |
Fix data race while accessing connection in partitionConsumer (#535) The partitionConsumer maintains a few internal go-routines, two of which access the underlying internal.Connection. The main runEvenstLoop() go-routine reads the connection field while a separate go-routine is used to detect connnection loss, initiate reconnection, and sets the connection. Previously, access to the conn field was not synchronized. Now, the conn field is read and written atomically; resolving the data race. Signed-off-by: Daniel Ferstay <dferstay@splunk.com> Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
A Go client library for the Apache Pulsar project.
This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo based library.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0