commit | ccd8af549dd65e10004f7b748e6a90f199eeec6f | [log] [tgz] |
---|---|---|
author | Daniel Blankensteiner <dba@danskecommodities.com> | Tue Aug 27 09:26:55 2019 +0200 |
committer | Daniel Blankensteiner <dba@danskecommodities.com> | Tue Aug 27 09:26:55 2019 +0200 |
tree | 5a34576128317d3fa804a934a932e9fa45bb01d5 | |
parent | 549f14848926be5199e4aec2be9e9206748f442e [diff] |
Support delayed delivery
Native .NET/C# client library for Apache Pulsar
DotPulsar is written entirely in C# and implement Apache Pulsar's binary protocol. Other options was using the C++ client library (which is what the Python client and Go client do) or build on top of the WebSocket API. We decided to implement the binary protocol in order to gain full control and maximize portability and performance.
DotPulsar‘s API is strongly inspired by Apache Pulsar’s official Java client, but a 100% match is not goal.
Let's see how to produce, consume and read messages.
Producers can be created via the extension method show below, which follows the API from the Java client:
var client = PulsarClient.Builder().Build(); var producer = client.NewProducer().Topic("persistent://public/default/mytopic").Create(); await producer.Send(System.Text.Encoding.UTF8.GetBytes("Hello World"));
If you are not a fan of extensions methods and builders, then there is another option:
var client = PulsarClient.Builder().Build(); var producerOptions = new ProducerOptions { ProducerName = "MyProducer", Topic = "persistent://public/default/mytopic" }; var producer = client.CreateProducer(producerOptions);
In the above you only specify the data being sent, but you can also specify metadata:
var data = Encoding.UTF8.GetBytes("Hello World"); var messageId = await producer.NewMessage() .Property("SomeKey", "SomeValue") //EventTime and SequenceId can also be set .Send(data);
If you are not a fan of extensions methods and builders, then there is another option:
var data = Encoding.UTF8.GetBytes("Hello World"); var metadata = new MessageMetadata(); //EventTime and SequenceId can be set via properties metadata["SomeKey"] = "SomeValue"; var messageId = await producer.Send(metadata, data));
Consumers can be created via the extension method show below, which follows the API from the Java client:
var client = PulsarClient.Builder().Build(); var consumer = client.NewConsumer() .SubscriptionName("MySubscription") .Topic("persistent://public/default/mytopic") .Create(); var message = await consumer.Receive(); Console.WriteLine("Received Message: " + Encoding.UTF8.GetString(message.Data.ToArray())); await consumer.Acknowledge(message);
If you are not a fan of extensions methods and builders, then there is another option:
var client = PulsarClient.Builder().Build(); var consumerOptions = new ConsumerOptions { SubscriptionName = "MySubscription", Topic = "persistent://public/default/mytopic" }; var consumer = client.CreateConsumer(consumerOptions);
Readers can be created via the extension method show below, which follows the API from the Java client:
var client = PulsarClient.Builder().Build(); var reader = client.NewReader() .StartMessageId(MessageId.Earliest) .Topic("persistent://public/default/mytopic") .Create(); var message = await reader.Receive(); Console.WriteLine("Received Message: " + Encoding.UTF8.GetString(message.Data.ToArray()));
If you are not a fan of extensions methods and builders, then there is another option:
var client = PulsarClient.Builder().Build(); var readerOptions = new ReaderOptions { StartMessageId = MessageId.Earliest, Topic = "persistent://public/default/mytopic" }; var reader = client.CreateReader(readerOptions);
Consumers, producers and readers all have states that can be monitored. Let's have a look at what states they can have.
Monitoring the state is easy, so let's see how to monitor when a consumer changes state:
private static async Task MonitorConsumerState(IConsumer consumer, CancellationToken cancellationToken) { var state = ConsumerState.Disconnected; while (true) { state = await consumer.StateChangedFrom(state, cancellationToken); switch (state) { case ConsumerState.Active: Console.WriteLine("Consumer is active"); break; case ConsumerState.Inactive: Console.WriteLine("Consumer is inactive"); break; case ConsumerState.Disconnected: Console.WriteLine("Consumer is disconnected"); break; case ConsumerState.Closed: Console.WriteLine("Consumer has closed"); break; case ConsumerState.ReachedEndOfTopic: Console.WriteLine("Consumer has reached end of topic"); break; case ConsumerState.Faulted: Console.WriteLine("Consumer has faulted"); break; } if (consumer.IsFinalState(state)) return; } }
Here the variable ‘state’ will contained to new state. You can both monitor going From (StateChangedFrom) and To (StateChangedTo) a state. Some states are final, meaning the state can no longer change. For consumers ‘Closed’, ‘Faulted’ and ‘ReachedEndOfTopic’ are final states. When the consumer enter a final state, all monitoring tasks are completed. So if you e.g. are monitoring going to ‘Diconnected’ and the consumer is ‘Closed’, then you task will complete and return ‘Closed’.
We use SemVer for versioning. For the versions available, see the tags on this repository.
See also the list of contributors who participated in this project.
This project is licensed under the Apache License Version 2.0 - see the LICENSE file for details.
1.0.0
X.X.X //Future