commit | 16fea6d572b936253aa17abdbe1c60eb9ceb54d5 | [log] [tgz] |
---|---|---|
author | Denis Vergnes <denis.vergnes@gmail.com> | Tue Oct 20 18:00:26 2020 -0700 |
committer | GitHub <noreply@github.com> | Wed Oct 21 09:00:26 2020 +0800 |
tree | 887a2252c377a28233025695ee00db58c1a1ec58 | |
parent | 85a9fe8e1c5a464ce87551dd9b9768e6ac674d48 [diff] |
Update default router to switch partition on all batching thresholds #382 (#383) Master Issue: #382 ### Motivation The default router only switches partition when the max delay to publish has elapsed. This PR is about switching as soon as one of the threshold (max number of messages, max number of bytes, max delay) is reached. ### Modifications The default router has been updated to switch to one of the threshold. To prevent cyclic dependencies between the packages, it has been moved from internal to pulsar package. ### Verifying this change - [X ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change added unit tests to verify all cases ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): no - The public API: no - The schema: no - The default values of configurations: no - The wire protocol: no ### Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? GoDocs
A Go client library for the Apache Pulsar project.
This projects is developing a pure-Go client library for Pulsar that does not depend on the C++ Pulsar library.
Once feature parity and stability are reached, this will supersede the current CGo based library.
Check the Projects page at https://github.com/apache/pulsar-client-go/projects for tracking the status and the progress.
Import the client library:
import "github.com/apache/pulsar-client-go/pulsar"
Create a Producer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Create a Consumer:
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
})
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
defer consumer.Close()
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
Create a Reader:
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
}
Contributions are welcomed and greatly appreciated. See CONTRIBUTING.md for details on submitting patches and the contribution workflow.
Name | Scope | |||
---|---|---|---|---|
users@pulsar.apache.org | User-related discussions | Subscribe | Unsubscribe | Archives |
dev@pulsar.apache.org | Development-related discussions | Subscribe | Unsubscribe | Archives |
Pulsar slack channel #dev-go
at https://apache-pulsar.slack.com/
You can self-register at https://apache-pulsar.herokuapp.com/
Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0