commit | 0f3e504604aa00c208d3f5946436db2e4837b7fd | [log] [tgz] |
---|---|---|
author | dferstay <dferstay@users.noreply.github.com> | Fri Jun 25 16:36:38 2021 -0700 |
committer | GitHub <noreply@github.com> | Fri Jun 25 16:36:38 2021 -0700 |
tree | f58583cd8f9d8d66d70af82f4f964f355cc84388 | |
parent | 641cb9da88695f5dcac5cc7028577a519df4d21a [diff] |
Producer respects Context passed to Send() and SendAsync() when applying backpressure (#534) Previously, the Producer ignored the context passed to Send() and SendAsync(). Now, the Producer respects the context in the case where the ProducerOptions.MaxPendingMessages limit is reached. In this case, the producer will block until a permit for sending the message is available or until the context expires, whichever comes first. Failures to send messages due to context expiration are communicated to callers via the existing TimeoutError error code. Signed-off-by: Daniel Ferstay <dferstay@splunk.com> Co-authored-by: Daniel Ferstay <dferstay@splunk.com>
A Go client library for the Apache Pulsar project.
This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo based library.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0