commit | b8bd55bc02bdaef89774b1b3e57c9652ec69cd90 | [log] [tgz] |
---|---|---|
author | cckellogg <cckellogg@gmail.com> | Fri Nov 20 03:19:47 2020 -0800 |
committer | GitHub <noreply@github.com> | Fri Nov 20 19:19:47 2020 +0800 |
tree | f13f654ac4b89b32f4fe75b0118aeccf4281b649 | |
parent | 82e676ec34f72f8f0d1c8eda5e03fb6805530611 [diff] |
Add metric for internal publish latency (#397) The current publish latency metric measures the time when send is called until the message is acked. The metric can be skewed if batching is enabled. For example, if we call send the client will hold that message until x number of messages are collected or an interval has passed. This new metric measures the time when we send the data to the connection until we get an ack from the broker. This cuts out any batch skewing and provides a more accurate latency of what the client sees at the connection level.
A Go client library for the Apache Pulsar project.
This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo based library.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0