Don't wait for shutdown using Console.ReadKey(). Using TaskCOmpletionSource instead.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index d09ae3f..3bc70e9 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -29,6 +29,14 @@
{
const string myTopic = "persistent://public/default/mytopic";
+ var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ Console.CancelKeyPress += (sender, args) =>
+ {
+ taskCompletionSource.SetResult(null);
+ args.Cancel = true;
+ };
+
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
var consumer = client.NewConsumer()
@@ -42,17 +50,17 @@
var consuming = ConsumeMessages(consumer, cts.Token);
- Console.WriteLine("Press a key to exit");
+ Console.WriteLine("Press Ctrl+C to exit");
- _ = Console.ReadKey();
+ await taskCompletionSource.Task;
cts.Cancel();
- await consuming.ConfigureAwait(false);
+ await consuming;
- await consumer.DisposeAsync().ConfigureAwait(false);
+ await consumer.DisposeAsync();
- await monitoring.ConfigureAwait(false);
+ await monitoring;
}
private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
@@ -65,7 +73,7 @@
{
var data = Encoding.UTF8.GetString(message.Data.ToArray());
Console.WriteLine("Received: " + data);
- await consumer.Acknowledge(message, cancellationToken).ConfigureAwait(false);
+ await consumer.Acknowledge(message, cancellationToken);
}
}
catch (OperationCanceledException) { }
@@ -79,7 +87,7 @@
while (true)
{
- var stateChanged = await consumer.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await consumer.StateChangedFrom(state);
state = stateChanged.ConsumerState;
var stateMessage = state switch
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 8db5b3d..7fe7d58 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -28,9 +28,19 @@
{
const string myTopic = "persistent://public/default/mytopic";
+ var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ Console.CancelKeyPress += (sender, args) =>
+ {
+ taskCompletionSource.SetResult(null);
+ args.Cancel = true;
+ };
+
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
- var producer = client.NewProducer().Topic(myTopic).Create();
+ var producer = client.NewProducer()
+ .Topic(myTopic)
+ .Create();
var monitoring = Monitor(producer);
@@ -38,17 +48,17 @@
var producing = ProduceMessages(producer, cts.Token);
- Console.WriteLine("Press a key to exit");
+ Console.WriteLine("Press Ctrl+C to exit");
- _ = Console.ReadKey();
+ await taskCompletionSource.Task;
cts.Cancel();
- await producing.ConfigureAwait(false);
+ await producing;
- await producer.DisposeAsync().ConfigureAwait(false);
+ await producer.DisposeAsync();
- await monitoring.ConfigureAwait(false);
+ await monitoring;
}
private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
@@ -63,9 +73,9 @@
{
var data = DateTime.UtcNow.ToLongTimeString();
var bytes = Encoding.UTF8.GetBytes(data);
- _ = await producer.Send(bytes, cancellationToken).ConfigureAwait(false);
+ _ = await producer.Send(bytes, cancellationToken);
Console.WriteLine("Sent: " + data);
- await Task.Delay(delay).ConfigureAwait(false);
+ await Task.Delay(delay, cancellationToken);
}
}
catch (OperationCanceledException) // If not using the cancellationToken, then just dispose the producer and catch ObjectDisposedException instead
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index 0769cdc..bb29458 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -29,6 +29,14 @@
{
const string myTopic = "persistent://public/default/mytopic";
+ var taskCompletionSource = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ Console.CancelKeyPress += (sender, args) =>
+ {
+ taskCompletionSource.SetResult(null);
+ args.Cancel = true;
+ };
+
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
var reader = client.NewReader()
@@ -42,17 +50,17 @@
var reading = ReadMessages(reader, cts.Token);
- Console.WriteLine("Press a key to exit");
+ Console.WriteLine("Press Ctrl+C to exit");
- _ = Console.ReadKey();
+ await taskCompletionSource.Task;
cts.Cancel();
- await reading.ConfigureAwait(false);
+ await reading;
- await reader.DisposeAsync().ConfigureAwait(false);
+ await reader.DisposeAsync();
- await monitoring.ConfigureAwait(false);
+ await monitoring;
}
private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
@@ -78,7 +86,7 @@
while (true)
{
- var stateChanged = await reader.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await reader.StateChangedFrom(state);
state = stateChanged.ReaderState;
var stateMessage = state switch