Adding new exceptions.
Adding new way of monitoring reader, consumer and producer state.
Updating NuGet packages.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 3bc70e9..914fc8a 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -40,12 +40,11 @@
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
var consumer = client.NewConsumer()
+ .StateChangedHandler(Monitor)
.SubscriptionName("MySubscription")
.Topic(myTopic)
.Create();
- var monitoring = Monitor(consumer);
-
var cts = new CancellationTokenSource();
var consuming = ConsumeMessages(consumer, cts.Token);
@@ -59,8 +58,6 @@
await consuming;
await consumer.DisposeAsync();
-
- await monitoring;
}
private static async Task ConsumeMessages(IConsumer consumer, CancellationToken cancellationToken)
@@ -79,18 +76,9 @@
catch (OperationCanceledException) { }
}
- private static async Task Monitor(IConsumer consumer)
+ private static void Monitor(ConsumerStateChanged stateChanged, CancellationToken cancellationToken)
{
- await Task.Yield();
-
- var state = ConsumerState.Disconnected;
-
- while (true)
- {
- var stateChanged = await consumer.StateChangedFrom(state);
- state = stateChanged.ConsumerState;
-
- var stateMessage = state switch
+ var stateMessage = stateChanged.ConsumerState switch
{
ConsumerState.Active => "is active",
ConsumerState.Inactive => "is inactive",
@@ -98,15 +86,11 @@
ConsumerState.Closed => "has closed",
ConsumerState.ReachedEndOfTopic => "has reached end of topic",
ConsumerState.Faulted => "has faulted",
- _ => $"has an unknown state '{state}'"
+ _ => $"has an unknown state '{stateChanged.ConsumerState}'"
};
var topic = stateChanged.Consumer.Topic;
Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
-
- if (consumer.IsFinalState(state))
- return;
- }
}
}
}
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 7fe7d58..26f7f51 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -39,11 +39,10 @@
await using var client = PulsarClient.Builder().Build(); //Connecting to pulsar://localhost:6650
var producer = client.NewProducer()
+ .StateChangedHandler(Monitor)
.Topic(myTopic)
.Create();
- var monitoring = Monitor(producer);
-
var cts = new CancellationTokenSource();
var producing = ProduceMessages(producer, cts.Token);
@@ -57,8 +56,6 @@
await producing;
await producer.DisposeAsync();
-
- await monitoring;
}
private static async Task ProduceMessages(IProducer producer, CancellationToken cancellationToken)
@@ -82,32 +79,19 @@
{ }
}
- private static async Task Monitor(IProducer producer)
+ private static void Monitor(ProducerStateChanged stateChanged, CancellationToken cancellationToken)
{
- await Task.Yield();
-
- var state = ProducerState.Disconnected;
-
- while (true)
+ var stateMessage = stateChanged.ProducerState switch
{
- var stateChanged = await producer.StateChangedFrom(state).ConfigureAwait(false);
- state = stateChanged.ProducerState;
+ ProducerState.Connected => "is connected",
+ ProducerState.Disconnected => "is disconnected",
+ ProducerState.Closed => "has closed",
+ ProducerState.Faulted => "has faulted",
+ _ => $"has an unknown state '{stateChanged.ProducerState}'"
+ };
- var stateMessage = state switch
- {
- ProducerState.Connected => "is connected",
- ProducerState.Disconnected => "is disconnected",
- ProducerState.Closed => "has closed",
- ProducerState.Faulted => "has faulted",
- _ => $"has an unknown state '{state}'"
- };
-
- var topic = stateChanged.Producer.Topic;
- Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
-
- if (producer.IsFinalState(state))
- return;
- }
+ var topic = stateChanged.Producer.Topic;
+ Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
}
}
}
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index bb29458..26289d3 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -41,11 +41,10 @@
var reader = client.NewReader()
.StartMessageId(MessageId.Earliest)
+ .StateChangedHandler(Monitor)
.Topic(myTopic)
.Create();
- var monitoring = Monitor(reader);
-
var cts = new CancellationTokenSource();
var reading = ReadMessages(reader, cts.Token);
@@ -59,8 +58,6 @@
await reading;
await reader.DisposeAsync();
-
- await monitoring;
}
private static async Task ReadMessages(IReader reader, CancellationToken cancellationToken)
@@ -78,33 +75,20 @@
catch (OperationCanceledException) { }
}
- private static async Task Monitor(IReader reader)
+ private static void Monitor(ReaderStateChanged stateChanged, CancellationToken cancellationToken)
{
- await Task.Yield();
-
- var state = ReaderState.Disconnected;
-
- while (true)
+ var stateMessage = stateChanged.ReaderState switch
{
- var stateChanged = await reader.StateChangedFrom(state);
- state = stateChanged.ReaderState;
+ ReaderState.Connected => "is connected",
+ ReaderState.Disconnected => "is disconnected",
+ ReaderState.Closed => "has closed",
+ ReaderState.ReachedEndOfTopic => "has reached end of topic",
+ ReaderState.Faulted => "has faulted",
+ _ => $"has an unknown state '{stateChanged.ReaderState}'"
+ };
- var stateMessage = state switch
- {
- ReaderState.Connected => "is connected",
- ReaderState.Disconnected => "is disconnected",
- ReaderState.Closed => "has closed",
- ReaderState.ReachedEndOfTopic => "has reached end of topic",
- ReaderState.Faulted => "has faulted",
- _ => $"has an unknown state '{state}'"
- };
-
- var topic = stateChanged.Reader.Topic;
- Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
-
- if (reader.IsFinalState(state))
- return;
- }
+ var topic = stateChanged.Reader.Topic;
+ Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
}
}
}
diff --git a/src/DotPulsar/Abstractions/IConsumerBuilder.cs b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
index 44fd068..4de94f1 100644
--- a/src/DotPulsar/Abstractions/IConsumerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IConsumerBuilder.cs
@@ -14,6 +14,10 @@
namespace DotPulsar.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
/// <summary>
/// A consumer building abstraction.
/// </summary>
@@ -30,21 +34,36 @@
IConsumerBuilder InitialPosition(SubscriptionInitialPosition initialPosition);
/// <summary>
- /// Set the priority level for the shared subscription consumer. The default is 0.
- /// </summary>
- IConsumerBuilder PriorityLevel(int priorityLevel);
-
- /// <summary>
/// Number of messages that will be prefetched. The default is 1000.
/// </summary>
IConsumerBuilder MessagePrefetchCount(uint count);
/// <summary>
+ /// Set the priority level for the shared subscription consumer. The default is 0.
+ /// </summary>
+ IConsumerBuilder PriorityLevel(int priorityLevel);
+
+ /// <summary>
/// Whether to read from the compacted topic. The default is 'false'.
/// </summary>
IConsumerBuilder ReadCompacted(bool readCompacted);
/// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ IConsumerBuilder StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler);
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ IConsumerBuilder StateChangedHandler(Action<ConsumerStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ IConsumerBuilder StateChangedHandler(Func<ConsumerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// Set the subscription name for this consumer. This is required.
/// </summary>
IConsumerBuilder SubscriptionName(string name);
diff --git a/src/DotPulsar/Abstractions/IHandleStateChanged.cs b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
new file mode 100644
index 0000000..04d6b60
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// An state change handling abstraction.
+ /// </summary>
+ public interface IHandleStateChanged<TStateChanged>
+ {
+ /// <summary>
+ /// Called after a state has changed.
+ /// </summary>
+ ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken);
+
+ /// <summary>
+ /// The cancellation token to use when waiting for and handling state changes.
+ /// </summary>
+ CancellationToken CancellationToken { get; }
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IProducerBuilder.cs b/src/DotPulsar/Abstractions/IProducerBuilder.cs
index 386e09f..2cdc6ef 100644
--- a/src/DotPulsar/Abstractions/IProducerBuilder.cs
+++ b/src/DotPulsar/Abstractions/IProducerBuilder.cs
@@ -14,20 +14,39 @@
namespace DotPulsar.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
/// <summary>
/// A producer building abstraction.
/// </summary>
public interface IProducerBuilder
{
/// <summary>
+ /// Set the initial sequence id. The default is 0.
+ /// </summary>
+ IProducerBuilder InitialSequenceId(ulong initialSequenceId);
+
+ /// <summary>
/// Set the producer name. This is optional.
/// </summary>
IProducerBuilder ProducerName(string name);
/// <summary>
- /// Set the initial sequence id. The default is 0.
+ /// Register a state changed handler.
/// </summary>
- IProducerBuilder InitialSequenceId(ulong initialSequenceId);
+ IProducerBuilder StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler);
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ IProducerBuilder StateChangedHandler(Action<ProducerStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ IProducerBuilder StateChangedHandler(Func<ProducerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
/// <summary>
/// Set the topic for this producer. This is required.
diff --git a/src/DotPulsar/Abstractions/IReaderBuilder.cs b/src/DotPulsar/Abstractions/IReaderBuilder.cs
index 4363697..3e97705 100644
--- a/src/DotPulsar/Abstractions/IReaderBuilder.cs
+++ b/src/DotPulsar/Abstractions/IReaderBuilder.cs
@@ -14,17 +14,16 @@
namespace DotPulsar.Abstractions
{
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
/// <summary>
/// A reader building abstraction.
/// </summary>
public interface IReaderBuilder
{
/// <summary>
- /// Set the reader name. This is optional.
- /// </summary>
- IReaderBuilder ReaderName(string name);
-
- /// <summary>
/// Number of messages that will be prefetched. The default is 1000.
/// </summary>
IReaderBuilder MessagePrefetchCount(uint count);
@@ -35,11 +34,31 @@
IReaderBuilder ReadCompacted(bool readCompacted);
/// <summary>
+ /// Set the reader name. This is optional.
+ /// </summary>
+ IReaderBuilder ReaderName(string name);
+
+ /// <summary>
/// The initial reader position is set to the specified message id. This is required.
/// </summary>
IReaderBuilder StartMessageId(MessageId messageId);
/// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ IReaderBuilder StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler);
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ IReaderBuilder StateChangedHandler(Action<ReaderStateChanged, CancellationToken> handler, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Register a state changed handler.
+ /// </summary>
+ IReaderBuilder StateChangedHandler(Func<ReaderStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// Set the topic for this reader. This is required.
/// </summary>
IReaderBuilder Topic(string topic);
diff --git a/src/DotPulsar/ConsumerOptions.cs b/src/DotPulsar/ConsumerOptions.cs
index a07d529..3d12d9f 100644
--- a/src/DotPulsar/ConsumerOptions.cs
+++ b/src/DotPulsar/ConsumerOptions.cs
@@ -14,6 +14,8 @@
namespace DotPulsar
{
+ using DotPulsar.Abstractions;
+
/// <summary>
/// The consumer building options.
/// </summary>
@@ -66,21 +68,26 @@
public SubscriptionInitialPosition InitialPosition { get; set; }
/// <summary>
- /// Set the priority level for the shared subscription consumer. The default is 0.
- /// </summary>
- public int PriorityLevel { get; set; }
-
- /// <summary>
/// Number of messages that will be prefetched. The default is 1000.
/// </summary>
public uint MessagePrefetchCount { get; set; }
/// <summary>
+ /// Set the priority level for the shared subscription consumer. The default is 0.
+ /// </summary>
+ public int PriorityLevel { get; set; }
+
+ /// <summary>
/// Whether to read from the compacted topic. The default is 'false'.
/// </summary>
public bool ReadCompacted { get; set; }
/// <summary>
+ /// Register a state changed handler. This is optional.
+ /// </summary>
+ public IHandleStateChanged<ConsumerStateChanged>? StateChangedHandler { get; set; }
+
+ /// <summary>
/// Set the subscription name for this consumer. This is required.
/// </summary>
public string SubscriptionName { get; set; }
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 9dcf9af..0fc4558 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -21,10 +21,10 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.0" />
+ <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="5.0.1" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="3.0.73" />
- <PackageReference Include="System.IO.Pipelines" Version="5.0.0" />
+ <PackageReference Include="System.IO.Pipelines" Version="5.0.1" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
diff --git a/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs b/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
new file mode 100644
index 0000000..e106f5e
--- /dev/null
+++ b/src/DotPulsar/Exceptions/InvalidTransactionStatusException.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+ using System;
+
+ public sealed class InvalidTransactionStatusException : DotPulsarException
+ {
+ public InvalidTransactionStatusException(string message) : base(message) { }
+
+ public InvalidTransactionStatusException(string message, Exception innerException) : base(message, innerException) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/NotAllowedErrorException.cs b/src/DotPulsar/Exceptions/NotAllowedErrorException.cs
new file mode 100644
index 0000000..d92e2aa
--- /dev/null
+++ b/src/DotPulsar/Exceptions/NotAllowedErrorException.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+ using System;
+
+ public sealed class NotAllowedErrorException : DotPulsarException
+ {
+ public NotAllowedErrorException(string message) : base(message) { }
+
+ public NotAllowedErrorException(string message, Exception innerException) : base(message, innerException) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/TransactionConflictException.cs b/src/DotPulsar/Exceptions/TransactionConflictException.cs
new file mode 100644
index 0000000..c358d39
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TransactionConflictException.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+ using System;
+
+ public sealed class TransactionConflictException : DotPulsarException
+ {
+ public TransactionConflictException(string message) : base(message) { }
+
+ public TransactionConflictException(string message, Exception innerException) : base(message, innerException) { }
+ }
+}
diff --git a/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs b/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
new file mode 100644
index 0000000..33d9158
--- /dev/null
+++ b/src/DotPulsar/Exceptions/TransactionCoordinatorNotFoundException.cs
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Exceptions
+{
+ using System;
+
+ public sealed class TransactionCoordinatorNotFoundException : DotPulsarException
+ {
+ public TransactionCoordinatorNotFoundException(string message) : base(message) { }
+
+ public TransactionCoordinatorNotFoundException(string message, Exception innerException) : base(message, innerException) { }
+ }
+}
diff --git a/src/DotPulsar/Internal/ActionStateChangedHandler.cs b/src/DotPulsar/Internal/ActionStateChangedHandler.cs
new file mode 100644
index 0000000..804d7b3
--- /dev/null
+++ b/src/DotPulsar/Internal/ActionStateChangedHandler.cs
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ public sealed class ActionStateChangedHandler<TStateChanged> : IHandleStateChanged<TStateChanged>
+ {
+ private readonly Action<TStateChanged, CancellationToken> _stateChangedHandler;
+
+ public ActionStateChangedHandler(Action<TStateChanged, CancellationToken> stateChangedHandler, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler = stateChangedHandler;
+ CancellationToken = cancellationToken;
+ }
+
+ public CancellationToken CancellationToken { get; }
+
+ public ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler(stateChanged, CancellationToken);
+ return new ValueTask();
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index edd856b..9fc42c3 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -105,7 +105,6 @@
else
await sslStream.DisposeAsync().ConfigureAwait(false);
#endif
-
throw;
}
}
diff --git a/src/DotPulsar/Internal/ConsumerBuilder.cs b/src/DotPulsar/Internal/ConsumerBuilder.cs
index 15b2875..6deee54 100644
--- a/src/DotPulsar/Internal/ConsumerBuilder.cs
+++ b/src/DotPulsar/Internal/ConsumerBuilder.cs
@@ -16,6 +16,9 @@
{
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
public sealed class ConsumerBuilder : IConsumerBuilder
{
@@ -28,6 +31,7 @@
private string? _subscriptionName;
private SubscriptionType _subscriptionType;
private string? _topic;
+ private IHandleStateChanged<ConsumerStateChanged>? _stateChangedHandler;
public ConsumerBuilder(IPulsarClient pulsarClient)
{
@@ -51,24 +55,42 @@
return this;
}
- public IConsumerBuilder PriorityLevel(int priorityLevel)
- {
- _priorityLevel = priorityLevel;
- return this;
- }
-
public IConsumerBuilder MessagePrefetchCount(uint count)
{
_messagePrefetchCount = count;
return this;
}
+ public IConsumerBuilder PriorityLevel(int priorityLevel)
+ {
+ _priorityLevel = priorityLevel;
+ return this;
+ }
+
public IConsumerBuilder ReadCompacted(bool readCompacted)
{
_readCompacted = readCompacted;
return this;
}
+ public IConsumerBuilder StateChangedHandler(IHandleStateChanged<ConsumerStateChanged> handler)
+ {
+ _stateChangedHandler = handler;
+ return this;
+ }
+
+ public IConsumerBuilder StateChangedHandler(Action<ConsumerStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler = new ActionStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken);
+ return this;
+ }
+
+ public IConsumerBuilder StateChangedHandler(Func<ConsumerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler = new FuncStateChangedHandler<ConsumerStateChanged>(handler, cancellationToken);
+ return this;
+ }
+
public IConsumerBuilder SubscriptionName(string name)
{
_subscriptionName = name;
@@ -102,6 +124,7 @@
MessagePrefetchCount = _messagePrefetchCount,
PriorityLevel = _priorityLevel,
ReadCompacted = _readCompacted,
+ StateChangedHandler = _stateChangedHandler,
SubscriptionType = _subscriptionType
};
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 4b182dc..6df4e0f 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -54,7 +54,9 @@
ServerError.ConsumerNotFound => new ConsumerNotFoundException(message),
ServerError.IncompatibleSchema => new IncompatibleSchemaException(message),
ServerError.InvalidTopicName => new InvalidTopicNameException(message),
+ ServerError.InvalidTxnStatus => new InvalidTransactionStatusException(message),
ServerError.MetadataError => new MetadataException(message),
+ ServerError.NotAllowedError => new NotAllowedErrorException(message),
ServerError.PersistenceError => new PersistenceException(message),
ServerError.ProducerBlockedQuotaExceededError => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
ServerError.ProducerBlockedQuotaExceededException => new ProducerBlockedQuotaExceededException($"{message}. Error code: {error}"),
@@ -64,8 +66,10 @@
ServerError.TooManyRequests => new TooManyRequestsException(message),
ServerError.TopicNotFound => new TopicNotFoundException(message),
ServerError.TopicTerminatedError => new TopicTerminatedException(message),
- ServerError.UnsupportedVersionError => new UnsupportedVersionException(message),
+ ServerError.TransactionConflict => new TransactionConflictException(message),
+ ServerError.TransactionCoordinatorNotFound => new TransactionCoordinatorNotFoundException(message),
ServerError.UnknownError => new UnknownException($"{message}. Error code: {error}"),
+ ServerError.UnsupportedVersionError => new UnsupportedVersionException(message),
_ => new UnknownException($"{message}. Error code: {error}")
});
diff --git a/src/DotPulsar/Internal/FuncStateChangedHandler.cs b/src/DotPulsar/Internal/FuncStateChangedHandler.cs
new file mode 100644
index 0000000..ab97199
--- /dev/null
+++ b/src/DotPulsar/Internal/FuncStateChangedHandler.cs
@@ -0,0 +1,37 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ public sealed class FuncStateChangedHandler<TStateChanged> : IHandleStateChanged<TStateChanged>
+ {
+ private readonly Func<TStateChanged, CancellationToken, ValueTask> _stateChangedHandler;
+
+ public FuncStateChangedHandler(Func<TStateChanged, CancellationToken, ValueTask> stateChangedHandler, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler = stateChangedHandler;
+ CancellationToken = cancellationToken;
+ }
+
+ public CancellationToken CancellationToken { get; }
+
+ public async ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken)
+ => await _stateChangedHandler(stateChanged, cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/src/DotPulsar/Internal/MonitorState.cs b/src/DotPulsar/Internal/MonitorState.cs
new file mode 100644
index 0000000..aafe349
--- /dev/null
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using DotPulsar.Abstractions;
+ using System.Threading.Tasks;
+
+ public static class StateMonitor
+
+ {
+ public static async Task MonitorProducer(IProducer producer, IHandleStateChanged<ProducerStateChanged> handler)
+ {
+ await Task.Yield();
+
+ var state = ProducerState.Disconnected;
+
+ while (!producer.IsFinalState(state))
+ {
+ var stateChanged = await producer.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
+ state = stateChanged.ProducerState;
+ try
+ {
+ await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+ }
+
+ public static async Task MonitorConsumer(IConsumer consumer, IHandleStateChanged<ConsumerStateChanged> handler)
+ {
+ await Task.Yield();
+
+ var state = ConsumerState.Disconnected;
+
+ while (!consumer.IsFinalState(state))
+ {
+ var stateChanged = await consumer.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
+ state = stateChanged.ConsumerState;
+ try
+ {
+ await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+ }
+
+ public static async Task MonitorReader(IReader reader, IHandleStateChanged<ReaderStateChanged> handler)
+ {
+ await Task.Yield();
+
+ var state = ReaderState.Disconnected;
+
+ while (!reader.IsFinalState(state))
+ {
+ var stateChanged = await reader.StateChangedFrom(state, handler.CancellationToken).ConfigureAwait(false);
+ state = stateChanged.ReaderState;
+ try
+ {
+ await handler.OnStateChanged(stateChanged, handler.CancellationToken).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/ProducerBuilder.cs b/src/DotPulsar/Internal/ProducerBuilder.cs
index 8916cd0..941c0aa 100644
--- a/src/DotPulsar/Internal/ProducerBuilder.cs
+++ b/src/DotPulsar/Internal/ProducerBuilder.cs
@@ -16,6 +16,9 @@
{
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
public sealed class ProducerBuilder : IProducerBuilder
{
@@ -23,6 +26,7 @@
private string? _producerName;
private ulong _initialSequenceId;
private string? _topic;
+ private IHandleStateChanged<ProducerStateChanged>? _stateChangedHandler;
public ProducerBuilder(IPulsarClient pulsarClient)
{
@@ -30,15 +34,33 @@
_initialSequenceId = ProducerOptions.DefaultInitialSequenceId;
}
+ public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
+ {
+ _initialSequenceId = initialSequenceId;
+ return this;
+ }
+
public IProducerBuilder ProducerName(string name)
{
_producerName = name;
return this;
}
- public IProducerBuilder InitialSequenceId(ulong initialSequenceId)
+ public IProducerBuilder StateChangedHandler(IHandleStateChanged<ProducerStateChanged> handler)
{
- _initialSequenceId = initialSequenceId;
+ _stateChangedHandler = handler;
+ return this;
+ }
+
+ public IProducerBuilder StateChangedHandler(Action<ProducerStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler = new ActionStateChangedHandler<ProducerStateChanged>(handler, cancellationToken);
+ return this;
+ }
+
+ public IProducerBuilder StateChangedHandler(Func<ProducerStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler = new FuncStateChangedHandler<ProducerStateChanged>(handler, cancellationToken);
return this;
}
@@ -56,7 +78,8 @@
var options = new ProducerOptions(_topic!)
{
InitialSequenceId = _initialSequenceId,
- ProducerName = _producerName
+ ProducerName = _producerName,
+ StateChangedHandler = _stateChangedHandler
};
return _pulsarClient.CreateProducer(options);
diff --git a/src/DotPulsar/Internal/ReaderBuilder.cs b/src/DotPulsar/Internal/ReaderBuilder.cs
index 41ff928..4936cd0 100644
--- a/src/DotPulsar/Internal/ReaderBuilder.cs
+++ b/src/DotPulsar/Internal/ReaderBuilder.cs
@@ -16,6 +16,9 @@
{
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
public sealed class ReaderBuilder : IReaderBuilder
{
@@ -25,6 +28,7 @@
private bool _readCompacted;
private MessageId? _startMessageId;
private string? _topic;
+ private IHandleStateChanged<ReaderStateChanged>? _stateChangedHandler;
public ReaderBuilder(IPulsarClient pulsarClient)
{
@@ -33,12 +37,6 @@
_readCompacted = ReaderOptions.DefaultReadCompacted;
}
- public IReaderBuilder ReaderName(string name)
- {
- _readerName = name;
- return this;
- }
-
public IReaderBuilder MessagePrefetchCount(uint count)
{
_messagePrefetchCount = count;
@@ -51,12 +49,36 @@
return this;
}
+ public IReaderBuilder ReaderName(string name)
+ {
+ _readerName = name;
+ return this;
+ }
+
public IReaderBuilder StartMessageId(MessageId messageId)
{
_startMessageId = messageId;
return this;
}
+ public IReaderBuilder StateChangedHandler(IHandleStateChanged<ReaderStateChanged> handler)
+ {
+ _stateChangedHandler = handler;
+ return this;
+ }
+
+ public IReaderBuilder StateChangedHandler(Action<ReaderStateChanged, CancellationToken> handler, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler = new ActionStateChangedHandler<ReaderStateChanged>(handler, cancellationToken);
+ return this;
+ }
+
+ public IReaderBuilder StateChangedHandler(Func<ReaderStateChanged, CancellationToken, ValueTask> handler, CancellationToken cancellationToken)
+ {
+ _stateChangedHandler = new FuncStateChangedHandler<ReaderStateChanged>(handler, cancellationToken);
+ return this;
+ }
+
public IReaderBuilder Topic(string topic)
{
_topic = topic;
@@ -75,7 +97,8 @@
{
MessagePrefetchCount = _messagePrefetchCount,
ReadCompacted = _readCompacted,
- ReaderName = _readerName
+ ReaderName = _readerName,
+ StateChangedHandler = _stateChangedHandler
};
return _pulsarClient.CreateReader(options);
diff --git a/src/DotPulsar/ProducerOptions.cs b/src/DotPulsar/ProducerOptions.cs
index 906b72d..7002742 100644
--- a/src/DotPulsar/ProducerOptions.cs
+++ b/src/DotPulsar/ProducerOptions.cs
@@ -14,6 +14,8 @@
namespace DotPulsar
{
+ using DotPulsar.Abstractions;
+
/// <summary>
/// The producer building options.
/// </summary>
@@ -31,14 +33,19 @@
}
/// <summary>
+ /// Set the initial sequence id. The default is 0.
+ /// </summary>
+ public ulong InitialSequenceId { get; set; }
+
+ /// <summary>
/// Set the producer name. This is optional.
/// </summary>
public string? ProducerName { get; set; }
/// <summary>
- /// Set the initial sequence id. The default is 0.
+ /// Register a state changed handler. This is optional.
/// </summary>
- public ulong InitialSequenceId { get; set; }
+ public IHandleStateChanged<ProducerStateChanged>? StateChangedHandler { get; set; }
/// <summary>
/// Set the topic for this producer. This is required.
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 982ee3b..e2c0577 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -58,6 +58,8 @@
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
var producer = new Producer(correlationId, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
+ if (options.StateChangedHandler is not null)
+ _ = StateMonitor.MonitorProducer(producer, options.StateChangedHandler);
var process = new ProducerProcess(correlationId, stateManager, factory, producer);
_processManager.Add(process);
process.Start();
@@ -73,9 +75,10 @@
var correlationId = Guid.NewGuid();
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ConsumerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
-
var stateManager = new StateManager<ConsumerState>(ConsumerState.Disconnected, ConsumerState.Closed, ConsumerState.ReachedEndOfTopic, ConsumerState.Faulted);
var consumer = new Consumer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+ if (options.StateChangedHandler is not null)
+ _ = StateMonitor.MonitorConsumer(consumer, options.StateChangedHandler);
var process = new ConsumerProcess(correlationId, stateManager, factory, consumer, options.SubscriptionType == SubscriptionType.Failover);
_processManager.Add(process);
process.Start();
@@ -93,6 +96,8 @@
var factory = new ReaderChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ReaderState>(ReaderState.Disconnected, ReaderState.Closed, ReaderState.ReachedEndOfTopic, ReaderState.Faulted);
var reader = new Reader(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+ if (options.StateChangedHandler is not null)
+ _ = StateMonitor.MonitorReader(reader, options.StateChangedHandler);
var process = new ReaderProcess(correlationId, stateManager, factory, reader);
_processManager.Add(process);
process.Start();
diff --git a/src/DotPulsar/ReaderOptions.cs b/src/DotPulsar/ReaderOptions.cs
index 4021087..3baa295 100644
--- a/src/DotPulsar/ReaderOptions.cs
+++ b/src/DotPulsar/ReaderOptions.cs
@@ -14,6 +14,8 @@
namespace DotPulsar
{
+ using DotPulsar.Abstractions;
+
/// <summary>
/// The reader building options.
/// </summary>
@@ -38,11 +40,6 @@
}
/// <summary>
- /// Set the reader name. This is optional.
- /// </summary>
- public string? ReaderName { get; set; }
-
- /// <summary>
/// Number of messages that will be prefetched. The default is 1000.
/// </summary>
public uint MessagePrefetchCount { get; set; }
@@ -53,11 +50,21 @@
public bool ReadCompacted { get; set; }
/// <summary>
+ /// Set the reader name. This is optional.
+ /// </summary>
+ public string? ReaderName { get; set; }
+
+ /// <summary>
/// The initial reader position is set to the specified message id. This is required.
/// </summary>
public MessageId StartMessageId { get; set; }
/// <summary>
+ /// Register a state changed handler. This is optional.
+ /// </summary>
+ public IHandleStateChanged<ReaderStateChanged>? StateChangedHandler { get; set; }
+
+ /// <summary>
/// Set the topic for this reader. This is required.
/// </summary>
public string Topic { get; set; }