Make ready for 0.10.0
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e636dcc..f5e33f2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,7 +4,7 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [Unreleased]
+## [0.10.0] - 2020-12-16
### Added
@@ -22,7 +22,7 @@
### Changed
-- The protobuf-net dependency is upgraded from 2.4.6 to 3.X
+- The protobuf-net dependency is upgraded from 2.4.6 to 3.0.73 to get support for ReadOnlySequence\<byte\>
### Fixed
diff --git a/README.md b/README.md
index 8bd71ca..c42e740 100644
--- a/README.md
+++ b/README.md
@@ -57,7 +57,7 @@
- [X] Consumer subscription with initial position and priority level
- [X] Consumer subscription types exclusive, shared, failover and key shared
- [X] Consumer receive and single + cumulative acknowledge
-- [X] Consumer seek
+- [X] Consumer and Reader seek on message id and publish time
- [X] Consumer unsubscribe
- [X] Consume compacted topics
- [X] Reader API
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index ad52b2b..80bde82 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -29,41 +29,29 @@
{
const string myTopic = "persistent://public/default/mytopic";
- var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, args) =>
{
- taskCompletionSource.SetResult();
+ cts.Cancel();
args.Cancel = true;
};
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
- var consumer = client.NewConsumer()
+ await using var consumer = client.NewConsumer()
.StateChangedHandler(Monitor)
.SubscriptionName("MySubscription")
.Topic(myTopic)
.Create();
- var cts = new CancellationTokenSource();
-
- var consuming = ConsumeMessages(consumer, cts.Token);
-
Console.WriteLine("Press Ctrl+C to exit");
- await taskCompletionSource.Task;
-
- cts.Cancel();
-
- await consuming;
-
- await consumer.DisposeAsync();
+ await ConsumeMessages(consumer, cts.Token);
}
private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
{
- await Task.Yield();
-
try
{
await foreach (var message in consumer.Messages(cancellationToken))
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 3eb4aae..ceb74a5 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -28,40 +28,28 @@
{
const string myTopic = "persistent://public/default/mytopic";
- var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, args) =>
{
- taskCompletionSource.SetResult();
+ cts.Cancel();
args.Cancel = true;
};
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
- var producer = client.NewProducer()
+ await using var producer = client.NewProducer()
.StateChangedHandler(Monitor)
.Topic(myTopic)
.Create();
- var cts = new CancellationTokenSource();
-
- var producing = ProduceMessages(producer, cts.Token);
-
Console.WriteLine("Press Ctrl+C to exit");
- await taskCompletionSource.Task;
-
- cts.Cancel();
-
- await producing;
-
- await producer.DisposeAsync();
+ await ProduceMessages(producer, cts.Token);
}
private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
{
- await Task.Yield();
-
var delay = TimeSpan.FromSeconds(5);
try
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 5bd6099..b867e4a 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -29,41 +29,29 @@
{
const string myTopic = "persistent://public/default/mytopic";
- var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+ var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, args) =>
{
- taskCompletionSource.SetResult();
+ cts.Cancel();
args.Cancel = true;
};
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
- var reader = client.NewReader()
+ await using var reader = client.NewReader()
.StartMessageId(MessageId.Earliest)
.StateChangedHandler(Monitor)
.Topic(myTopic)
.Create();
- var cts = new CancellationTokenSource();
-
- var reading = ReadMessages(reader, cts.Token);
-
Console.WriteLine("Press Ctrl+C to exit");
- await taskCompletionSource.Task;
-
- cts.Cancel();
-
- await reading;
-
- await reader.DisposeAsync();
+ await ReadMessages(reader, cts.Token);
}
private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
{
- await Task.Yield();
-
try
{
await foreach (var message in reader.Messages(cancellationToken))
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 0fc4558..4db3197 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
- <Version>0.9.7</Version>
+ <Version>0.10.0</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>DanskeCommodities;dblank</Authors>
diff --git a/src/DotPulsar/Internal/MonitorState.cs b/src/DotPulsar/Internal/MonitorState.cs
index aafe349..28e0f9a 100644
--- a/src/DotPulsar/Internal/MonitorState.cs
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -18,7 +18,6 @@
using System.Threading.Tasks;
public static class StateMonitor
-
{
public static async Task MonitorProducer(IProducer producer, IHandleStateChanged<ProducerStateChanged> handler)
{