commit | a7f28644271add6708abc7269a1f02062c1f64ff | [log] [tgz] |
---|---|---|
author | Daniel Blankensteiner <dba@danskecommodities.com> | Fri Aug 30 13:54:46 2019 +0200 |
committer | Daniel Blankensteiner <dba@danskecommodities.com> | Fri Aug 30 13:54:46 2019 +0200 |
tree | 6ac3d2d37e00e1fd70323c344170ce006f766de4 | |
parent | 5ced4f9e42ed0f1deca5e4284065d51396c32e0b [diff] |
Throw a ConfigurationException when trying to create a consumer, reader or producer without setting the required options. Let the user specify the retry interval on reconnects and faulted operations.
Native .NET/C# client library for Apache Pulsar
DotPulsar is written entirely in C# and implement Apache Pulsar's binary protocol. Other options was using the C++ client library (which is what the Python client and Go client do) or build on top of the WebSocket API. We decided to implement the binary protocol to gain full control and maximize portability and performance.
DotPulsar‘s API is strongly inspired by Apache Pulsar’s official Java client, but a 100% match is not a goal.
Let's see how to produce, consume and read messages.
Producers can be created via the extension method show below, which follows the API from the Java client:
var client = PulsarClient.Builder().Build(); var producer = client.NewProducer().Topic("persistent://public/default/mytopic").Create(); await producer.Send(System.Text.Encoding.UTF8.GetBytes("Hello World"));
If you are not a fan of extensions methods and builders, then there is another option:
var client = PulsarClient.Builder().Build(); var producerOptions = new ProducerOptions { ProducerName = "MyProducer", Topic = "persistent://public/default/mytopic" }; var producer = client.CreateProducer(producerOptions);
In the above you only specify the data being sent, but you can also specify metadata:
var data = Encoding.UTF8.GetBytes("Hello World"); var messageId = await producer.NewMessage() .Property("SomeKey", "SomeValue") //EventTime and SequenceId can also be set .Send(data);
If you are not a fan of extensions methods and builders, then there is another option:
var data = Encoding.UTF8.GetBytes("Hello World"); var metadata = new MessageMetadata(); //EventTime and SequenceId can be set via properties metadata["SomeKey"] = "SomeValue"; var messageId = await producer.Send(metadata, data));
Consumers can be created via the extension method show below, which follows the API from the Java client:
var client = PulsarClient.Builder().Build(); var consumer = client.NewConsumer() .SubscriptionName("MySubscription") .Topic("persistent://public/default/mytopic") .Create(); var message = await consumer.Receive(); Console.WriteLine("Received Message: " + Encoding.UTF8.GetString(message.Data.ToArray())); await consumer.Acknowledge(message);
If you are not a fan of extensions methods and builders, then there is another option:
var client = PulsarClient.Builder().Build(); var consumerOptions = new ConsumerOptions { SubscriptionName = "MySubscription", Topic = "persistent://public/default/mytopic" }; var consumer = client.CreateConsumer(consumerOptions);
Readers can be created via the extension method show below, which follows the API from the Java client:
var client = PulsarClient.Builder().Build(); var reader = client.NewReader() .StartMessageId(MessageId.Earliest) .Topic("persistent://public/default/mytopic") .Create(); var message = await reader.Receive(); Console.WriteLine("Received Message: " + Encoding.UTF8.GetString(message.Data.ToArray()));
If you are not a fan of extensions methods and builders, then there is another option:
var client = PulsarClient.Builder().Build(); var readerOptions = new ReaderOptions { StartMessageId = MessageId.Earliest, Topic = "persistent://public/default/mytopic" }; var reader = client.CreateReader(readerOptions);
Consumers, producers and readers all have states that can be monitored. Let's have a look at what states they can have.
Monitoring the state is easy, so let's see how to monitor when a consumer changes state:
private static async Task MonitorConsumerState(IConsumer consumer, CancellationToken cancellationToken) { var state = ConsumerState.Disconnected; while (true) { state = await consumer.StateChangedFrom(state, cancellationToken); switch (state) { case ConsumerState.Active: Console.WriteLine("Consumer is active"); break; case ConsumerState.Inactive: Console.WriteLine("Consumer is inactive"); break; case ConsumerState.Disconnected: Console.WriteLine("Consumer is disconnected"); break; case ConsumerState.Closed: Console.WriteLine("Consumer has closed"); break; case ConsumerState.ReachedEndOfTopic: Console.WriteLine("Consumer has reached end of topic"); break; case ConsumerState.Faulted: Console.WriteLine("Consumer has faulted"); break; } if (consumer.IsFinalState(state)) return; } }
Here the variable ‘state’ will contain the new state. You can monitor going From (StateChangedFrom) and To (StateChangedTo) a state. Some states are final, meaning the state can no longer change. For consumers ‘Closed’, ‘Faulted’ and ‘ReachedEndOfTopic’ are final states. When the consumer enters a final state, all monitoring tasks are completed. So if you e.g. are monitoring going to ‘Disconnected’ and the consumer is ‘Closed’, then your task will complete and return ‘Closed’.
We use SemVer for versioning. For the versions available, see the tags on this repository.
See also the list of contributors who participated in this project.
This project is licensed under the Apache License Version 2.0 - see the LICENSE file for details.
1.0.0
X.X.X //Future