Make ready for release 3.0.1
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2dd7a71..1247972 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,16 +4,15 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [Unreleased]
+## [3.0.1] - 2023-09-15
### Changed
-- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a MessageId without the topic field if
- MessageId.Earliest is found.
+- When calling GetLastMessageId(s) on a Reader or Consumer, it returns a MessageId without the topic field if MessageId.Earliest is found
### Fixed
-- Fixed issue with DotPulsar client not handling connection faults for consumers and readers.
+- Fixed issue with the DotPulsar client not handling connection faults for consumers and readers
## [3.0.0] - 2023-08-30
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 36c6bb4..0e8fbd3 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;net6.0;net7.0</TargetFrameworks>
- <Version>3.0.0</Version>
+ <Version>3.0.1</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/tests/DotPulsar.Tests/ConsumerTests.cs b/tests/DotPulsar.Tests/ConsumerTests.cs
index 4c63a51..6460dbe 100644
--- a/tests/DotPulsar.Tests/ConsumerTests.cs
+++ b/tests/DotPulsar.Tests/ConsumerTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -208,26 +208,15 @@
{
//Arrange
var semaphoreSlim = new SemaphoreSlim(1);
- await using var
- client = PulsarClient.Builder().ExceptionHandler(context =>
- {
- semaphoreSlim.WaitAsync();
- context.Result = FaultAction.Rethrow;
- context.ExceptionHandled = true;
- })
- .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists.
- .Build();
+ await using var client = PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ semaphoreSlim.WaitAsync();
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var consumer = client.NewConsumer(Schema.String)
- .StateChangedHandler(changed =>
- {
- var topic = changed.Consumer.Topic;
- var state = changed.ConsumerState;
- _testOutputHelper.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'");
- })
- .SubscriptionName("MySubscription")
- .Topic("persistent://public/default/mytopic")
- .Create();
+ await using var consumer = CreateConsumer(client, SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
var receiveTask = consumer.Receive().AsTask();
semaphoreSlim.Release();
@@ -243,29 +232,16 @@
public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
{
//Arrange
- var cts = new CancellationTokenSource();
+ await using var client = PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var
- client = PulsarClient.Builder().ExceptionHandler(context =>
- {
- context.Result = FaultAction.Rethrow;
- context.ExceptionHandled = true;
- })
- .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists.
- .Build();
+ await using var consumer = CreateConsumer(client, SubscriptionInitialPosition.Earliest, "persistent://a/b/c", "cn", "sn");
- await using var consumer = client.NewConsumer(Schema.String)
- .StateChangedHandler(changed =>
- {
- var topic = changed.Consumer.Topic;
- var state = changed.ConsumerState;
- _testOutputHelper.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'");
- })
- .SubscriptionName("MySubscription")
- .Topic("persistent://public/default/mytopic")
- .Create();
-
- await consumer.OnStateChangeTo(ConsumerState.Faulted, cts.Token);
+ await consumer.OnStateChangeTo(ConsumerState.Faulted);
//Act
var exception = await Record.ExceptionAsync(() => consumer.Receive().AsTask());
@@ -305,18 +281,30 @@
return messageIds;
}
- private IProducer<string> CreateProducer(IPulsarClient pulsarClient, string topicName) => pulsarClient.NewProducer(Schema.String)
+ private void LogState(ConsumerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The consumer for topic '{stateChange.Consumer.Topic}' changed state to '{stateChange.ConsumerState}'");
+
+ private void LogState(ProducerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The producer for topic '{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+
+ private IProducer<string> CreateProducer(IPulsarClient pulsarClient, string topicName)
+ => pulsarClient.NewProducer(Schema.String)
.Topic(topicName)
+ .StateChangedHandler(LogState)
.Create();
- private IConsumer<string> CreateConsumer(IPulsarClient pulsarClient, SubscriptionInitialPosition subscriptionInitialPosition,
+ private IConsumer<string> CreateConsumer(
+ IPulsarClient pulsarClient,
+ SubscriptionInitialPosition subscriptionInitialPosition,
string topicName,
string consumerName,
- string subscriptionName) => pulsarClient.NewConsumer(Schema.String)
+ string subscriptionName)
+ => pulsarClient.NewConsumer(Schema.String)
.ConsumerName(consumerName)
.InitialPosition(subscriptionInitialPosition)
.SubscriptionName(subscriptionName)
.Topic(topicName)
+ .StateChangedHandler(LogState)
.Create();
private IPulsarClient CreateClient()
diff --git a/tests/DotPulsar.Tests/ReaderTests.cs b/tests/DotPulsar.Tests/ReaderTests.cs
index 6a595d6..a2c546f 100644
--- a/tests/DotPulsar.Tests/ReaderTests.cs
+++ b/tests/DotPulsar.Tests/ReaderTests.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -210,26 +210,15 @@
{
//Arrange
var semaphoreSlim = new SemaphoreSlim(1);
- await using var
- client = PulsarClient.Builder().ExceptionHandler(context =>
- {
- semaphoreSlim.WaitAsync();
- context.Result = FaultAction.Rethrow;
- context.ExceptionHandled = true;
- })
- .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists.
- .Build();
+ await using var client = PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ semaphoreSlim.WaitAsync();
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var reader = client.NewReader(Schema.String)
- .StartMessageId(MessageId.Earliest)
- .StateChangedHandler(changed =>
- {
- var topic = changed.Reader.Topic;
- var state = changed.ReaderState;
- _testOutputHelper.WriteLine($"The consumer for topic '{topic}' changed state to '{state}'");
- })
- .Topic("persistent://public/default/mytopic")
- .Create();
+ await using var reader = CreateReader(client, MessageId.Earliest, "persistent://a/b/c");
var receiveTask = reader.Receive().AsTask();
semaphoreSlim.Release();
@@ -245,29 +234,16 @@
public async Task Receive_WhenFaultedBeforeInvokingReceive_ShouldThrowConsumerFaultedException()
{
//Arrange
- var cts = new CancellationTokenSource();
+ await using var client = PulsarClient.Builder().ExceptionHandler(context =>
+ {
+ context.Result = FaultAction.Rethrow;
+ context.ExceptionHandled = true;
+ })
+ .ServiceUrl(new Uri("pulsar://nosuchhost")).Build();
- await using var
- client = PulsarClient.Builder().ExceptionHandler(context =>
- {
- context.Result = FaultAction.Rethrow;
- context.ExceptionHandled = true;
- })
- .ServiceUrl(new Uri("pulsar://localhost:9512")) //point to a cluster that does not exists.
- .Build();
+ await using var reader = CreateReader(client, MessageId.Earliest, "persistent://a/b/c");
- await using var reader = client.NewReader(Schema.String)
- .StartMessageId(MessageId.Earliest)
- .StateChangedHandler(changed =>
- {
- var topic = changed.Reader.Topic;
- var state = changed.ReaderState;
- _testOutputHelper.WriteLine($"The reader for topic '{topic}' changed state to '{state}'");
- })
- .Topic("persistent://public/default/mytopic")
- .Create();
-
- await reader.OnStateChangeTo(ReaderState.Faulted, cts.Token);
+ await reader.OnStateChangeTo(ReaderState.Faulted);
//Act
var exception = await Record.ExceptionAsync(() => reader.Receive().AsTask());
@@ -276,13 +252,21 @@
exception.Should().BeOfType<ReaderFaultedException>();
}
+ private void LogState(ReaderStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The reader for topic '{stateChange.Reader.Topic}' changed state to '{stateChange.ReaderState}'");
+
+ private void LogState(ProducerStateChanged stateChange)
+ => _testOutputHelper.WriteLine($"The producer for topic '{stateChange.Producer.Topic}' changed state to '{stateChange.ProducerState}'");
+
private IProducer<String> CreateProducer(IPulsarClient pulsarClient, string topicName) => pulsarClient.NewProducer(Schema.String)
.Topic(topicName)
+ .StateChangedHandler(LogState)
.Create();
private IReader<String> CreateReader(IPulsarClient pulsarClient, MessageId messageId, string topicName) => pulsarClient.NewReader(Schema.String)
.StartMessageId(messageId)
.Topic(topicName)
+ .StateChangedHandler(LogState)
.Create();
private IPulsarClient CreateClient()