When monitoring state we now return a class giving us both the new state and the Consumer/Producer/Reader.
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 59fe108..36f8497 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -79,20 +79,22 @@
while (true)
{
- state = await consumer.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await consumer.StateChangedFrom(state).ConfigureAwait(false);
+ state = stateChanged.ConsumerState;
var stateMessage = state switch
{
- ConsumerState.Active => "The consumer is active",
- ConsumerState.Inactive => "The consumer is inactive",
- ConsumerState.Disconnected => "The consumer is disconnected",
- ConsumerState.Closed => "The consumer has closed",
- ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
- ConsumerState.Faulted => "The consumer has faulted",
- _ => $"The consumer has an unknown state '{state}'"
+ ConsumerState.Active => "is active",
+ ConsumerState.Inactive => "is inactive",
+ ConsumerState.Disconnected => "is disconnected",
+ ConsumerState.Closed => "has closed",
+ ConsumerState.ReachedEndOfTopic => "has reached end of topic",
+ ConsumerState.Faulted => "has faulted",
+ _ => $"has an unknown state '{state}'"
};
- Console.WriteLine(stateMessage);
+ var topic = stateChanged.Consumer.Topic;
+ Console.WriteLine($"The consumer for topic '{topic}' " + stateMessage);
if (consumer.IsFinalState(state))
return;
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 73ddff8..664793f 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -78,18 +78,20 @@
while (true)
{
- state = await producer.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await producer.StateChangedFrom(state).ConfigureAwait(false);
+ state = stateChanged.ProducerState;
var stateMessage = state switch
{
- ProducerState.Connected => "The producer is connected",
- ProducerState.Disconnected => "The producer is disconnected",
- ProducerState.Closed => "The producer has closed",
- ProducerState.Faulted => "The producer has faulted",
- _ => $"The producer has an unknown state '{state}'"
+ ProducerState.Connected => "is connected",
+ ProducerState.Disconnected => "is disconnected",
+ ProducerState.Closed => "has closed",
+ ProducerState.Faulted => "has faulted",
+ _ => $"has an unknown state '{state}'"
};
- Console.WriteLine(stateMessage);
+ var topic = stateChanged.Producer.Topic;
+ Console.WriteLine($"The producer for topic '{topic}' " + stateMessage);
if (producer.IsFinalState(state))
return;
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b42275d..d7c9f6c 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -78,19 +78,21 @@
while (true)
{
- state = await reader.StateChangedFrom(state).ConfigureAwait(false);
+ var stateChanged = await reader.StateChangedFrom(state).ConfigureAwait(false);
+ state = stateChanged.ReaderState;
var stateMessage = state switch
{
- ReaderState.Connected => "The reader is connected",
- ReaderState.Disconnected => "The reader is disconnected",
- ReaderState.Closed => "The reader has closed",
- ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
- ReaderState.Faulted => "The reader has faulted",
- _ => $"The reader has an unknown state '{state}'"
+ ReaderState.Connected => "is connected",
+ ReaderState.Disconnected => "is disconnected",
+ ReaderState.Closed => "has closed",
+ ReaderState.ReachedEndOfTopic => "has reached end of topic",
+ ReaderState.Faulted => "has faulted",
+ _ => $"has an unknown state '{state}'"
};
- Console.WriteLine(stateMessage);
+ var topic = stateChanged.Reader.Topic;
+ Console.WriteLine($"The reader for topic '{topic}' " + stateMessage);
if (reader.IsFinalState(state))
return;
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 4d33ab7..70bfb99 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,7 +22,7 @@
/// <summary>
/// A consumer abstraction.
/// </summary>
- public interface IConsumer : IStateChanged<ConsumerState>, IAsyncDisposable
+ public interface IConsumer : IAsyncDisposable
{
/// <summary>
/// Acknowledge the consumption of a single message.
@@ -50,6 +50,22 @@
ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
/// <summary>
+ /// Ask whether the current state is final, meaning that it will never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState();
+
+ /// <summary>
+ /// Ask whether the provided state is final, meaning that it will never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState(ConsumerState state);
+
+ /// <summary>
/// Get an IAsyncEnumerable for consuming messages
/// </summary>
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
@@ -60,6 +76,28 @@
ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// The topic of the consumer.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 11e46d1..bd37d46 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -22,9 +22,25 @@
/// <summary>
/// A producer abstraction.
/// </summary>
- public interface IProducer : IStateChanged<ProducerState>, IAsyncDisposable
+ public interface IProducer : IAsyncDisposable
{
/// <summary>
+ /// Ask whether the current state is final, meaning that it will never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState();
+
+ /// <summary>
+ /// Ask whether the provided state is final, meaning that it will never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState(ProducerState state);
+
+ /// <summary>
/// Sends a message.
/// </summary>
ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken = default);
@@ -55,6 +71,28 @@
ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
/// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// The topic of the producer.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 091551c..81bc9b4 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -17,18 +17,57 @@
using System;
using System.Collections.Generic;
using System.Threading;
+ using System.Threading.Tasks;
/// <summary>
/// A reader abstraction.
/// </summary>
- public interface IReader : IStateChanged<ReaderState>, IAsyncDisposable
+ public interface IReader : IAsyncDisposable
{
/// <summary>
+ /// Ask whether the current state is final, meaning that it will never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState();
+
+ /// <summary>
+ /// Ask whether the provided state is final, meaning that it will never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState(ReaderState state);
+
+ /// <summary>
/// Get an IAsyncEnumerable for reading messages
/// </summary>
IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
/// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
/// The topic of the reader.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/ConsumerStateChanged.cs b/src/DotPulsar/ConsumerStateChanged.cs
new file mode 100644
index 0000000..1f0a80e
--- /dev/null
+++ b/src/DotPulsar/ConsumerStateChanged.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+ using DotPulsar.Abstractions;
+
+ public sealed class ConsumerStateChanged
+ {
+ internal ConsumerStateChanged(IConsumer consumer, ConsumerState consumerState)
+ {
+ Consumer = consumer;
+ ConsumerState = consumerState;
+ }
+
+ public IConsumer Consumer { get; }
+ public ConsumerState ConsumerState { get; }
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IStateChanged.cs b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
similarity index 97%
rename from src/DotPulsar/Abstractions/IStateChanged.cs
rename to src/DotPulsar/Internal/Abstractions/IStateChanged.cs
index 079b9be..732f13e 100644
--- a/src/DotPulsar/Abstractions/IStateChanged.cs
+++ b/src/DotPulsar/Internal/Abstractions/IStateChanged.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -12,7 +12,7 @@
* limitations under the License.
*/
-namespace DotPulsar.Abstractions
+namespace DotPulsar.Internal.Abstractions
{
using System.Threading;
using System.Threading.Tasks;
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 9fbdc37..2208606 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -57,11 +57,17 @@
_eventRegister.Register(new ConsumerCreated(_correlationId, this));
}
- public async ValueTask<ConsumerState> StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
- => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+ return new ConsumerStateChanged(this, newState);
+ }
- public async ValueTask<ConsumerState> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+ return new ConsumerStateChanged(this, newState);
+ }
public bool IsFinalState()
=> _state.IsFinalState();
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index d9e5dd0..28e0fe2 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -53,11 +53,17 @@
_eventRegister.Register(new ProducerCreated(_correlationId, this));
}
- public ValueTask<ProducerState> StateChangedTo(ProducerState state, CancellationToken cancellationToken)
- => _state.StateChangedTo(state, cancellationToken);
+ public async ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+ return new ProducerStateChanged(this, newState);
+ }
- public ValueTask<ProducerState> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
- => _state.StateChangedFrom(state, cancellationToken);
+ public async ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+ return new ProducerStateChanged(this, newState);
+ }
public bool IsFinalState()
=> _state.IsFinalState();
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 5b7d740..bb659e7 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -54,11 +54,17 @@
_eventRegister.Register(new ReaderCreated(_correlationId, this));
}
- public async ValueTask<ReaderState> StateChangedTo(ReaderState state, CancellationToken cancellationToken)
- => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
+ return new ReaderStateChanged(this, newState);
+ }
- public async ValueTask<ReaderState> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
- => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+ public async ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
+ {
+ var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
+ return new ReaderStateChanged(this, newState);
+ }
public bool IsFinalState()
=> _state.IsFinalState();
diff --git a/src/DotPulsar/ProducerStateChanged.cs b/src/DotPulsar/ProducerStateChanged.cs
new file mode 100644
index 0000000..f866954
--- /dev/null
+++ b/src/DotPulsar/ProducerStateChanged.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+ using DotPulsar.Abstractions;
+
+ public sealed class ProducerStateChanged
+ {
+ internal ProducerStateChanged(IProducer producer, ProducerState producerState)
+ {
+ Producer = producer;
+ ProducerState = producerState;
+ }
+
+ public IProducer Producer { get; }
+ public ProducerState ProducerState { get; }
+ }
+}
diff --git a/src/DotPulsar/ReaderStateChanged.cs b/src/DotPulsar/ReaderStateChanged.cs
new file mode 100644
index 0000000..4133abd
--- /dev/null
+++ b/src/DotPulsar/ReaderStateChanged.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar
+{
+ using DotPulsar.Abstractions;
+
+ public sealed class ReaderStateChanged
+ {
+ internal ReaderStateChanged(IReader reader, ReaderState readerState)
+ {
+ Reader = reader;
+ ReaderState = readerState;
+ }
+
+ public IReader Reader { get; }
+ public ReaderState ReaderState { get; }
+ }
+}