Refactoring and moving that can be moved to extension methods.
diff --git a/src/DotPulsar/Abstractions/IConsumer.cs b/src/DotPulsar/Abstractions/IConsumer.cs
index 48fe671..7fb4e16 100644
--- a/src/DotPulsar/Abstractions/IConsumer.cs
+++ b/src/DotPulsar/Abstractions/IConsumer.cs
@@ -22,97 +22,19 @@
/// <summary>
/// A consumer abstraction.
/// </summary>
- public interface IConsumer : IAsyncDisposable
+ public interface IConsumer : IGetLastMessageId, IReceive, ISeek, IState<ConsumerState>, IAsyncDisposable
{
/// <summary>
- /// Acknowledge the consumption of a single message.
- /// </summary>
- ValueTask Acknowledge(Message message, CancellationToken cancellationToken = default);
-
- /// <summary>
/// Acknowledge the consumption of a single message using the MessageId.
/// </summary>
ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
- /// Acknowledge the consumption of all the messages in the topic up to and including the provided message.
- /// </summary>
- ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken = default);
-
- /// <summary>
/// Acknowledge the consumption of all the messages in the topic up to and including the provided MessageId.
/// </summary>
ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken = default);
/// <summary>
- /// Get the MessageId of the last message on the topic.
- /// </summary>
- ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Ask whether the current state is final, meaning that it will never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState();
-
- /// <summary>
- /// Ask whether the provided state is final, meaning that it will never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState(ConsumerState state);
-
- /// <summary>
- /// Get an IAsyncEnumerable for consuming messages.
- /// </summary>
- IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this consumer to a specific MessageId.
- /// </summary>
- ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this consumer to a specific message publish time using unix time in milliseconds.
- /// </summary>
- ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this consumer to a specific message publish time using an UTC DateTime.
- /// </summary>
- ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this consumer to a specific message publish time using a DateTimeOffset.
- /// </summary>
- ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change to a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will complete.
- /// </remarks>
- ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change from a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will complete.
- /// </remarks>
- ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken = default);
-
- /// <summary>
/// The topic of the consumer.
/// </summary>
string Topic { get; }
@@ -125,11 +47,11 @@
/// <summary>
/// Redeliver the pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
- ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken);
+ ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken = default);
/// <summary>
/// Redeliver all pending messages that were pushed to this consumer that are not yet acknowledged.
/// </summary>
- ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken);
+ ValueTask RedeliverUnacknowledgedMessages(CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs b/src/DotPulsar/Abstractions/IGetLastMessageId.cs
similarity index 63%
copy from src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
copy to src/DotPulsar/Abstractions/IGetLastMessageId.cs
index c651dbf..0c3059a 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
+++ b/src/DotPulsar/Abstractions/IGetLastMessageId.cs
@@ -12,13 +12,19 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Abstractions
{
using System.Threading;
using System.Threading.Tasks;
- public interface IReaderChannelFactory
+ /// <summary>
+ /// An abstraction for getting the last message id.
+ /// </summary>
+ public interface IGetLastMessageId
{
- Task<IReaderChannel> Create(CancellationToken cancellationToken = default);
+ /// <summary>
+ /// Get the MessageId of the last message on the topic.
+ /// </summary>
+ ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/IHandleStateChanged.cs b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
index 04d6b60..8be3a99 100644
--- a/src/DotPulsar/Abstractions/IHandleStateChanged.cs
+++ b/src/DotPulsar/Abstractions/IHandleStateChanged.cs
@@ -25,7 +25,7 @@
/// <summary>
/// Called after a state has changed.
/// </summary>
- ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken);
+ ValueTask OnStateChanged(TStateChanged stateChanged, CancellationToken cancellationToken = default);
/// <summary>
/// The cancellation token to use when waiting for and handling state changes.
diff --git a/src/DotPulsar/Abstractions/IProducer.cs b/src/DotPulsar/Abstractions/IProducer.cs
index 03ff03e..27c59ee 100644
--- a/src/DotPulsar/Abstractions/IProducer.cs
+++ b/src/DotPulsar/Abstractions/IProducer.cs
@@ -15,84 +15,13 @@
namespace DotPulsar.Abstractions
{
using System;
- using System.Buffers;
- using System.Threading;
- using System.Threading.Tasks;
/// <summary>
/// A producer abstraction.
/// </summary>
- public interface IProducer : IAsyncDisposable
+ public interface IProducer : ISend, IState<ProducerState>, IAsyncDisposable
{
/// <summary>
- /// Ask whether the current state is final, meaning that it will never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState();
-
- /// <summary>
- /// Ask whether the provided state is final, meaning that it will never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState(ProducerState state);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message.
- /// </summary>
- ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Sends a message with metadata.
- /// </summary>
- ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change to a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will complete.
- /// </remarks>
- ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change from a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will complete.
- /// </remarks>
- ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, CancellationToken cancellationToken = default);
-
- /// <summary>
/// The topic of the producer.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Abstractions/IReader.cs b/src/DotPulsar/Abstractions/IReader.cs
index 1023419..6b4207b 100644
--- a/src/DotPulsar/Abstractions/IReader.cs
+++ b/src/DotPulsar/Abstractions/IReader.cs
@@ -15,84 +15,13 @@
namespace DotPulsar.Abstractions
{
using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Threading.Tasks;
/// <summary>
/// A reader abstraction.
/// </summary>
- public interface IReader : IAsyncDisposable
+ public interface IReader : IGetLastMessageId, IReceive, ISeek, IState<ReaderState>, IAsyncDisposable
{
/// <summary>
- /// Ask whether the current state is final, meaning that it will never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState();
-
- /// <summary>
- /// Ask whether the provided state is final, meaning that it will never change.
- /// </summary>
- /// <returns>
- /// True if it's final and False if it's not.
- /// </returns>
- bool IsFinalState(ReaderState state);
-
- /// <summary>
- /// Get the MessageId of the last message on the topic.
- /// </summary>
- ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Get an IAsyncEnumerable for reading messages
- /// </summary>
- IAsyncEnumerable<Message> Messages(CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this reader to a specific MessageId.
- /// </summary>
- ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this reader to a specific message publish time using unix time in milliseconds.
- /// </summary>
- ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this reader to a specific message publish time using an UTC DateTime.
- /// </summary>
- ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Reset the subscription associated with this reader to a specific message publish time using a DateTimeOffset.
- /// </summary>
- ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change to a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will complete.
- /// </remarks>
- ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, CancellationToken cancellationToken = default);
-
- /// <summary>
- /// Wait for the state to change from a specific state.
- /// </summary>
- /// <returns>
- /// The current state.
- /// </returns>
- /// <remarks>
- /// If the state change to a final state, then all awaiting tasks will complete.
- /// </remarks>
- ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, CancellationToken cancellationToken = default);
-
- /// <summary>
/// The topic of the reader.
/// </summary>
string Topic { get; }
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs b/src/DotPulsar/Abstractions/IReceive.cs
similarity index 66%
rename from src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
rename to src/DotPulsar/Abstractions/IReceive.cs
index c651dbf..2ff275c 100644
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannelFactory.cs
+++ b/src/DotPulsar/Abstractions/IReceive.cs
@@ -12,13 +12,19 @@
* limitations under the License.
*/
-namespace DotPulsar.Internal.Abstractions
+namespace DotPulsar.Abstractions
{
using System.Threading;
using System.Threading.Tasks;
- public interface IReaderChannelFactory
+ /// <summary>
+ /// An abstraction for receiving a single message.
+ /// </summary>
+ public interface IReceive
{
- Task<IReaderChannel> Create(CancellationToken cancellationToken = default);
+ /// <summary>
+ /// Receive a single message.
+ /// </summary>
+ ValueTask<Message> Receive(CancellationToken cancellationToken = default);
}
}
diff --git a/src/DotPulsar/Abstractions/ISeek.cs b/src/DotPulsar/Abstractions/ISeek.cs
new file mode 100644
index 0000000..373c619
--- /dev/null
+++ b/src/DotPulsar/Abstractions/ISeek.cs
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// An abstraction for seeking.
+ /// </summary>
+ public interface ISeek
+ {
+ /// <summary>
+ /// Reset the cursor associated with the consumer or reader to a specific MessageId.
+ /// </summary>
+ ValueTask Seek(MessageId messageId, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Reset the cursor associated with the consumer or reader to a specific message publish time using unix time in milliseconds.
+ /// </summary>
+ ValueTask Seek(ulong publishTime, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Abstractions/ISend.cs b/src/DotPulsar/Abstractions/ISend.cs
new file mode 100644
index 0000000..2948c34
--- /dev/null
+++ b/src/DotPulsar/Abstractions/ISend.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// An abstraction for sending a message.
+ /// </summary>
+ public interface ISend
+ {
+ /// <summary>
+ /// Sends a message.
+ /// </summary>
+ ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Sends a message with metadata.
+ /// </summary>
+ ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Abstractions/IState.cs b/src/DotPulsar/Abstractions/IState.cs
new file mode 100644
index 0000000..48a975e
--- /dev/null
+++ b/src/DotPulsar/Abstractions/IState.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Abstractions
+{
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// A state change monitoring abstraction.
+ /// </summary>
+ public interface IState<TState> where TState : notnull
+ {
+ /// <summary>
+ /// Ask whether the current state is final, meaning that it will never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState();
+
+ /// <summary>
+ /// Ask whether the provided state is final, meaning that it will never change.
+ /// </summary>
+ /// <returns>
+ /// True if it's final and False if it's not.
+ /// </returns>
+ bool IsFinalState(TState state);
+
+ /// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ ValueTask<TState> OnStateChangeTo(TState state, CancellationToken cancellationToken = default);
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ ValueTask<TState> OnStateChangeFrom(TState state, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/DotPulsar/Extensions/ConsumerExtensions.cs b/src/DotPulsar/Extensions/ConsumerExtensions.cs
new file mode 100644
index 0000000..b61971c
--- /dev/null
+++ b/src/DotPulsar/Extensions/ConsumerExtensions.cs
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Extensions for IConsumer.
+ /// </summary>
+ public static class ConsumerExtensions
+ {
+ /// <summary>
+ /// Acknowledge the consumption of a single message.
+ /// </summary>
+ public static async ValueTask Acknowledge(this IConsumer consumer, Message message, CancellationToken cancellationToken = default)
+ => await consumer.Acknowledge(message.MessageId, cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Acknowledge the consumption of all the messages in the topic up to and including the provided message.
+ /// </summary>
+ public static async ValueTask AcknowledgeCumulative(this IConsumer consumer, Message message, CancellationToken cancellationToken = default)
+ => await consumer.AcknowledgeCumulative(message.MessageId, cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ public static async ValueTask<ConsumerStateChanged> StateChangedTo(this IConsumer consumer, ConsumerState state, CancellationToken cancellationToken = default)
+ {
+ var newState = await consumer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
+ return new ConsumerStateChanged(consumer, newState);
+ }
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ public static async ValueTask<ConsumerStateChanged> StateChangedFrom(this IConsumer consumer, ConsumerState state, CancellationToken cancellationToken = default)
+ {
+ var newState = await consumer.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
+ return new ConsumerStateChanged(consumer, newState);
+ }
+ }
+}
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
index 9c3007c..8f02262 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -16,7 +16,12 @@
{
using Abstractions;
using Internal;
+ using System.Threading;
+ using System.Threading.Tasks;
+ /// <summary>
+ /// Extensions for IProducer.
+ /// </summary>
public static class ProducerExtensions
{
/// <summary>
@@ -24,5 +29,35 @@
/// </summary>
public static IMessageBuilder NewMessage(this IProducer producer)
=> new MessageBuilder(producer);
+
+ /// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ public static async ValueTask<ProducerStateChanged> StateChangedTo(this IProducer producer, ProducerState state, CancellationToken cancellationToken = default)
+ {
+ var newState = await producer.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
+ return new ProducerStateChanged(producer, newState);
+ }
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ public static async ValueTask<ProducerStateChanged> StateChangedFrom(this IProducer producer, ProducerState state, CancellationToken cancellationToken = default)
+ {
+ var newState = await producer.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
+ return new ProducerStateChanged(producer, newState);
+ }
}
}
diff --git a/src/DotPulsar/Extensions/PulsarClientExtensions.cs b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
index 77e95aa..574bce1 100644
--- a/src/DotPulsar/Extensions/PulsarClientExtensions.cs
+++ b/src/DotPulsar/Extensions/PulsarClientExtensions.cs
@@ -17,6 +17,9 @@
using Abstractions;
using Internal;
+ /// <summary>
+ /// Extensions for IPulsarClient.
+ /// </summary>
public static class PulsarClientExtensions
{
/// <summary>
diff --git a/src/DotPulsar/Extensions/ReaderExtensions.cs b/src/DotPulsar/Extensions/ReaderExtensions.cs
new file mode 100644
index 0000000..86d093b
--- /dev/null
+++ b/src/DotPulsar/Extensions/ReaderExtensions.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Extensions for IReader.
+ /// </summary>
+ public static class ReaderExtensions
+ {
+ /// <summary>
+ /// Wait for the state to change to a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ public static async ValueTask<ReaderStateChanged> StateChangedTo(this IReader reader, ReaderState state, CancellationToken cancellationToken = default)
+ {
+ var newState = await reader.OnStateChangeTo(state, cancellationToken).ConfigureAwait(false);
+ return new ReaderStateChanged(reader, newState);
+ }
+
+ /// <summary>
+ /// Wait for the state to change from a specific state.
+ /// </summary>
+ /// <returns>
+ /// The current state.
+ /// </returns>
+ /// <remarks>
+ /// If the state change to a final state, then all awaiting tasks will complete.
+ /// </remarks>
+ public static async ValueTask<ReaderStateChanged> StateChangedFrom(this IReader reader, ReaderState state, CancellationToken cancellationToken = default)
+ {
+ var newState = await reader.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
+ return new ReaderStateChanged(reader, newState);
+ }
+ }
+}
diff --git a/src/DotPulsar/Extensions/ReceiveExtensions.cs b/src/DotPulsar/Extensions/ReceiveExtensions.cs
new file mode 100644
index 0000000..c2262c6
--- /dev/null
+++ b/src/DotPulsar/Extensions/ReceiveExtensions.cs
@@ -0,0 +1,36 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using System.Collections.Generic;
+ using System.Runtime.CompilerServices;
+ using System.Threading;
+
+ /// <summary>
+ /// Extensions for IReceive.
+ /// </summary>
+ public static class ReceiveExtensions
+ {
+ /// <summary>
+ /// Get an IAsyncEnumerable for receiving messages.
+ /// </summary>
+ public static async IAsyncEnumerable<Message> Messages(this IReceive receiver, [EnumeratorCancellation] CancellationToken cancellationToken = default)
+ {
+ while (!cancellationToken.IsCancellationRequested)
+ yield return await receiver.Receive(cancellationToken).ConfigureAwait(false);
+ }
+ }
+}
diff --git a/src/DotPulsar/Extensions/SeekExtensions.cs b/src/DotPulsar/Extensions/SeekExtensions.cs
new file mode 100644
index 0000000..b4f0d2c
--- /dev/null
+++ b/src/DotPulsar/Extensions/SeekExtensions.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using DotPulsar.Abstractions;
+ using System;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Extensions for ISeek.
+ /// </summary>
+ public static class SeekExtensions
+ {
+ /// <summary>
+ /// Reset the cursor associated with the consumer or reader to a specific message publish time using an UTC DateTime.
+ /// </summary>
+ public static async ValueTask Seek(this ISeek seeker, DateTime publishTime, CancellationToken cancellationToken = default)
+ => await seeker.Seek((ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds(), cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Reset the cursor associated with the consumer or reader to a specific message publish time using a DateTimeOffset.
+ /// </summary>
+ public static async ValueTask Seek(this ISeek seeker, DateTimeOffset publishTime, CancellationToken cancellationToken = default)
+ => await seeker.Seek((ulong) publishTime.ToUnixTimeMilliseconds(), cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/src/DotPulsar/Extensions/SendExtensions.cs b/src/DotPulsar/Extensions/SendExtensions.cs
new file mode 100644
index 0000000..41ace1e
--- /dev/null
+++ b/src/DotPulsar/Extensions/SendExtensions.cs
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Extensions
+{
+ using Abstractions;
+ using System;
+ using System.Buffers;
+ using System.Threading;
+ using System.Threading.Tasks;
+
+ /// <summary>
+ /// Extensions for ISend.
+ /// </summary>
+ public static class SendExtensions
+ {
+ /// <summary>
+ /// Sends a message.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this ISend sender, byte[] data, CancellationToken cancellationToken = default)
+ => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Sends a message.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this ISend sender, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+ => await sender.Send(new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Sends a message with metadata.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this ISend sender, MessageMetadata metadata, byte[] data, CancellationToken cancellationToken = default)
+ => await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+
+ /// <summary>
+ /// Sends a message with metadata.
+ /// </summary>
+ public static async ValueTask<MessageId> Send(this ISend sender, MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken = default)
+ => await sender.Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken).ConfigureAwait(false);
+ }
+}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index 31140b2..ef09431 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -23,9 +23,9 @@
{
Task Send(CommandAck command, CancellationToken cancellationToken);
Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken);
- Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
- Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
- Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
+ Task Send(CommandUnsubscribe command, CancellationToken cancellationToken);
+ Task Send(CommandSeek command, CancellationToken cancellationToken);
+ Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
ValueTask<Message> Receive(CancellationToken cancellationToken);
ValueTask ClosedByClient(CancellationToken cancellationToken);
}
diff --git a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs b/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
deleted file mode 100644
index 55135ea..0000000
--- a/src/DotPulsar/Internal/Abstractions/IReaderChannel.cs
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-namespace DotPulsar.Internal.Abstractions
-{
- using DotPulsar.Internal.PulsarApi;
- using System;
- using System.Threading;
- using System.Threading.Tasks;
-
- public interface IReaderChannel : IAsyncDisposable
- {
- Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
- Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
- ValueTask<Message> Receive(CancellationToken cancellationToken);
- ValueTask ClosedByClient(CancellationToken cancellationToken);
- }
-}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index c6e9dfc..d263df2 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -17,13 +17,13 @@
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using DotPulsar.Internal.Extensions;
using Events;
using Microsoft.Extensions.ObjectPool;
using PulsarApi;
using System;
using System.Collections.Generic;
using System.Linq;
- using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -59,17 +59,11 @@
_eventRegister.Register(new ConsumerCreated(_correlationId, this));
}
- public async ValueTask<ConsumerStateChanged> StateChangedTo(ConsumerState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
- return new ConsumerStateChanged(this, newState);
- }
+ public async ValueTask<ConsumerState> OnStateChangeTo(ConsumerState state, CancellationToken cancellationToken)
+ => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
- public async ValueTask<ConsumerStateChanged> StateChangedFrom(ConsumerState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
- return new ConsumerStateChanged(this, newState);
- }
+ public async ValueTask<ConsumerState> OnStateChangeFrom(ConsumerState state, CancellationToken cancellationToken)
+ => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
public bool IsFinalState()
=> _state.IsFinalState();
@@ -87,35 +81,28 @@
await _channel.DisposeAsync().ConfigureAwait(false);
}
- public async IAsyncEnumerable<Message> Messages([EnumeratorCancellation] CancellationToken cancellationToken)
+ public async ValueTask<Message> Receive(CancellationToken cancellationToken)
{
ThrowIfDisposed();
- while (!cancellationToken.IsCancellationRequested)
- yield return await _executor.Execute(() => Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<Message> Receive(CancellationToken cancellationToken)
+ private async ValueTask<Message> ReceiveMessage(CancellationToken cancellationToken)
=> await _channel.Receive(cancellationToken).ConfigureAwait(false);
- public async ValueTask Acknowledge(Message message, CancellationToken cancellationToken)
- => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
-
public async ValueTask Acknowledge(MessageId messageId, CancellationToken cancellationToken)
- => await Acknowledge(messageId.Data, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
-
- public async ValueTask AcknowledgeCumulative(Message message, CancellationToken cancellationToken)
- => await Acknowledge(message.MessageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
+ => await Acknowledge(messageId, CommandAck.AckType.Individual, cancellationToken).ConfigureAwait(false);
public async ValueTask AcknowledgeCumulative(MessageId messageId, CancellationToken cancellationToken)
- => await Acknowledge(messageId.Data, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
+ => await Acknowledge(messageId, CommandAck.AckType.Cumulative, cancellationToken).ConfigureAwait(false);
public async ValueTask RedeliverUnacknowledgedMessages(IEnumerable<MessageId> messageIds, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var command = new CommandRedeliverUnacknowledgedMessages();
- command.MessageIds.AddRange(messageIds.Select(m => m.Data));
+ command.MessageIds.AddRange(messageIds.Select(messageId => messageId.ToMessageIdData()));
await _executor.Execute(() => RedeliverUnacknowledgedMessages(command, cancellationToken), cancellationToken).ConfigureAwait(false);
}
@@ -131,16 +118,14 @@
}
private async ValueTask Unsubscribe(CommandUnsubscribe command, CancellationToken cancellationToken)
- {
- _ = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- }
+ =>await _channel.Send(command, cancellationToken).ConfigureAwait(false);
public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var seek = new CommandSeek { MessageId = messageId.Data };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
+ await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
@@ -148,23 +133,7 @@
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = publishTime };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
@@ -176,22 +145,21 @@
}
private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
- {
- var response = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- return new MessageId(response.LastMessageId);
- }
-
- private async ValueTask<CommandSuccess> Seek(CommandSeek command, CancellationToken cancellationToken)
=> await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- private async ValueTask Acknowledge(MessageIdData messageIdData, CommandAck.AckType ackType, CancellationToken cancellationToken)
+ private async Task Seek(CommandSeek command, CancellationToken cancellationToken)
+ => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+
+ private async ValueTask Acknowledge(MessageId messageId, CommandAck.AckType ackType, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var commandAck = _commandAckPool.Get();
commandAck.Type = ackType;
- commandAck.MessageIds.Clear();
- commandAck.MessageIds.Add(messageIdData);
+ if (commandAck.MessageIds.Count == 0)
+ commandAck.MessageIds.Add(messageId.ToMessageIdData());
+ else
+ commandAck.MessageIds[0].MapFrom(messageId);
try
{
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 775e240..de109dd 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -21,7 +21,7 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class ConsumerChannel : IConsumerChannel, IReaderChannel
+ public sealed class ConsumerChannel : IConsumerChannel
{
private readonly ulong _id;
private readonly AsyncQueue<MessagePackage> _queue;
@@ -88,7 +88,7 @@
return metadata.ShouldSerializeNumMessagesInBatch()
? _batchHandler.Add(messageId, redeliveryCount, metadata, data)
- : MessageFactory.Create(new MessageId(messageId), redeliveryCount, metadata, data);
+ : MessageFactory.Create(messageId.ToMessageId(), redeliveryCount, metadata, data);
}
}
}
@@ -117,29 +117,27 @@
await _connection.Send(command, cancellationToken).ConfigureAwait(false);
}
- public async Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
+ public async Task Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Success);
- return response.Success;
}
- public async Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
+ public async Task Send(CommandSeek command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.Success);
_batchHandler.Clear();
- return response.Success;
}
- public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+ public async Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
var response = await _connection.Send(command, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
- return response.GetLastMessageIdResponse;
+ return response.GetLastMessageIdResponse.LastMessageId.ToMessageId();
}
public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs b/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
new file mode 100644
index 0000000..857b00c
--- /dev/null
+++ b/src/DotPulsar/Internal/Extensions/MessageIdDataExtensions.cs
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal.Extensions
+{
+ using DotPulsar.Internal.PulsarApi;
+
+ public static class MessageIdDataExtensions
+ {
+ public static MessageId ToMessageId(this MessageIdData messageIdData)
+ => new MessageId(messageIdData.LedgerId, messageIdData.EntryId, messageIdData.Partition, messageIdData.BatchIndex);
+
+ public static void MapFrom(this MessageIdData destination, MessageId source)
+ {
+ destination.LedgerId = source.LedgerId;
+ destination.EntryId = source.EntryId;
+ destination.Partition = source.Partition;
+ destination.BatchIndex = source.BatchIndex;
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/MessageBuilder.cs b/src/DotPulsar/Internal/MessageBuilder.cs
index d7c7b02..c17ec19 100644
--- a/src/DotPulsar/Internal/MessageBuilder.cs
+++ b/src/DotPulsar/Internal/MessageBuilder.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal
{
using DotPulsar.Abstractions;
+ using DotPulsar.Extensions;
using Extensions;
using System;
using System.Threading;
diff --git a/src/DotPulsar/Internal/MonitorState.cs b/src/DotPulsar/Internal/MonitorState.cs
index 28e0f9a..7a50044 100644
--- a/src/DotPulsar/Internal/MonitorState.cs
+++ b/src/DotPulsar/Internal/MonitorState.cs
@@ -15,6 +15,7 @@
namespace DotPulsar.Internal
{
using DotPulsar.Abstractions;
+ using DotPulsar.Extensions;
using System.Threading.Tasks;
public static class StateMonitor
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index e463ff6..75e724f 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -22,7 +22,7 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel, IReaderChannel
+ public sealed class NotReadyChannel : IConsumerChannel, IProducerChannel
{
public ValueTask DisposeAsync()
=> new ValueTask();
@@ -42,13 +42,13 @@
public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken)
=> throw GetException();
- public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
+ public Task Send(CommandUnsubscribe command, CancellationToken cancellationToken)
=> throw GetException();
- public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
+ public Task Send(CommandSeek command, CancellationToken cancellationToken)
=> throw GetException();
- public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
+ public Task<MessageId> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
=> throw GetException();
private static Exception GetException()
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 2347eee..12e4625 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -17,6 +17,7 @@
using Abstractions;
using DotPulsar.Abstractions;
using DotPulsar.Exceptions;
+ using DotPulsar.Internal.Extensions;
using Events;
using Microsoft.Extensions.ObjectPool;
using System;
@@ -60,17 +61,11 @@
_eventRegister.Register(new ProducerCreated(_correlationId, this));
}
- public async ValueTask<ProducerStateChanged> StateChangedTo(ProducerState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
- return new ProducerStateChanged(this, newState);
- }
+ public async ValueTask<ProducerState> OnStateChangeTo(ProducerState state, CancellationToken cancellationToken)
+ => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
- public async ValueTask<ProducerStateChanged> StateChangedFrom(ProducerState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
- return new ProducerStateChanged(this, newState);
- }
+ public async ValueTask<ProducerState> OnStateChangeFrom(ProducerState state, CancellationToken cancellationToken)
+ => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
public bool IsFinalState()
=> _state.IsFinalState();
@@ -88,12 +83,6 @@
await _channel.DisposeAsync().ConfigureAwait(false);
}
- public ValueTask<MessageId> Send(byte[] data, CancellationToken cancellationToken)
- => Send(new ReadOnlySequence<byte>(data), cancellationToken);
-
- public ValueTask<MessageId> Send(ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => Send(new ReadOnlySequence<byte>(data), cancellationToken);
-
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -110,12 +99,6 @@
}
}
- public ValueTask<MessageId> Send(MessageMetadata metadata, byte[] data, CancellationToken cancellationToken)
- => Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken);
-
- public ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlyMemory<byte> data, CancellationToken cancellationToken)
- => Send(metadata, new ReadOnlySequence<byte>(data), cancellationToken);
-
public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
@@ -138,7 +121,7 @@
private async ValueTask<MessageId> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
var response = await _channel.Send(metadata, data, cancellationToken).ConfigureAwait(false);
- return new MessageId(response.MessageId);
+ return response.MessageId.ToMessageId();
}
internal async ValueTask SetChannel(IProducerChannel channel)
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 571c5b6..ccbea59 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -20,8 +20,6 @@
using DotPulsar.Internal.PulsarApi;
using Events;
using System;
- using System.Collections.Generic;
- using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@@ -29,7 +27,7 @@
{
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
- private IReaderChannel _channel;
+ private IConsumerChannel _channel;
private readonly IExecute _executor;
private readonly IStateChanged<ReaderState> _state;
private int _isDisposed;
@@ -40,7 +38,7 @@
Guid correlationId,
string topic,
IRegisterEvent eventRegister,
- IReaderChannel initialChannel,
+ IConsumerChannel initialChannel,
IExecute executor,
IStateChanged<ReaderState> state)
{
@@ -55,17 +53,11 @@
_eventRegister.Register(new ReaderCreated(_correlationId, this));
}
- public async ValueTask<ReaderStateChanged> StateChangedTo(ReaderState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
- return new ReaderStateChanged(this, newState);
- }
+ public async ValueTask<ReaderState> OnStateChangeTo(ReaderState state, CancellationToken cancellationToken)
+ => await _state.StateChangedTo(state, cancellationToken).ConfigureAwait(false);
- public async ValueTask<ReaderStateChanged> StateChangedFrom(ReaderState state, CancellationToken cancellationToken)
- {
- var newState = await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
- return new ReaderStateChanged(this, newState);
- }
+ public async ValueTask<ReaderState> OnStateChangeFrom(ReaderState state, CancellationToken cancellationToken)
+ => await _state.StateChangedFrom(state, cancellationToken).ConfigureAwait(false);
public bool IsFinalState()
=> _state.IsFinalState();
@@ -82,28 +74,24 @@
}
private async ValueTask<MessageId> GetLastMessageId(CommandGetLastMessageId command, CancellationToken cancellationToken)
- {
- var response = await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- return new MessageId(response.LastMessageId);
- }
+ => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- public async IAsyncEnumerable<Message> Messages([EnumeratorCancellation] CancellationToken cancellationToken)
+ public async ValueTask<Message> Receive(CancellationToken cancellationToken)
{
ThrowIfDisposed();
- while (!cancellationToken.IsCancellationRequested)
- yield return await _executor.Execute(() => Receive(cancellationToken), cancellationToken).ConfigureAwait(false);
+ return await _executor.Execute(() => ReceiveMessage(cancellationToken), cancellationToken).ConfigureAwait(false);
}
- private async ValueTask<Message> Receive(CancellationToken cancellationToken)
+ private async ValueTask<Message> ReceiveMessage(CancellationToken cancellationToken)
=> await _channel.Receive(cancellationToken).ConfigureAwait(false);
public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var seek = new CommandSeek { MessageId = messageId.Data };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ var seek = new CommandSeek { MessageId = messageId.ToMessageIdData() };
+ await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
}
public async ValueTask Seek(ulong publishTime, CancellationToken cancellationToken)
@@ -111,23 +99,7 @@
ThrowIfDisposed();
var seek = new CommandSeek { MessagePublishTime = publishTime };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(DateTime publishTime, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var seek = new CommandSeek { MessagePublishTime = (ulong) new DateTimeOffset(publishTime).ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
- }
-
- public async ValueTask Seek(DateTimeOffset publishTime, CancellationToken cancellationToken)
- {
- ThrowIfDisposed();
-
- var seek = new CommandSeek { MessagePublishTime = (ulong) publishTime.ToUnixTimeMilliseconds() };
- _ = await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
+ await _executor.Execute(() => Seek(seek, cancellationToken), cancellationToken).ConfigureAwait(false);
}
public async ValueTask DisposeAsync()
@@ -140,10 +112,10 @@
await _channel.DisposeAsync().ConfigureAwait(false);
}
- private async ValueTask<CommandSuccess> Seek(CommandSeek command, CancellationToken cancellationToken)
+ private async Task Seek(CommandSeek command, CancellationToken cancellationToken)
=> await _channel.Send(command, cancellationToken).ConfigureAwait(false);
- internal async ValueTask SetChannel(IReaderChannel channel)
+ internal async ValueTask SetChannel(IConsumerChannel channel)
{
if (_isDisposed != 0)
{
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 9efe033..58b31e1 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -20,7 +20,7 @@
using System.Threading;
using System.Threading.Tasks;
- public sealed class ReaderChannelFactory : IReaderChannelFactory
+ public sealed class ReaderChannelFactory : IConsumerChannelFactory
{
private readonly Guid _correlationId;
private readonly IRegisterEvent _eventRegister;
@@ -48,7 +48,7 @@
ConsumerName = options.ReaderName,
Durable = false,
ReadCompacted = options.ReadCompacted,
- StartMessageId = options.StartMessageId.Data,
+ StartMessageId = options.StartMessageId.ToMessageIdData(),
Subscription = $"Reader-{Guid.NewGuid():N}",
Topic = options.Topic
};
@@ -56,10 +56,10 @@
_batchHandler = new BatchHandler(false);
}
- public async Task<IReaderChannel> Create(CancellationToken cancellationToken)
+ public async Task<IConsumerChannel> Create(CancellationToken cancellationToken)
=> await _executor.Execute(() => GetChannel(cancellationToken), cancellationToken).ConfigureAwait(false);
- private async ValueTask<IReaderChannel> GetChannel(CancellationToken cancellationToken)
+ private async ValueTask<IConsumerChannel> GetChannel(CancellationToken cancellationToken)
{
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken).ConfigureAwait(false);
var messageQueue = new AsyncQueue<MessagePackage>();
diff --git a/src/DotPulsar/Internal/ReaderProcess.cs b/src/DotPulsar/Internal/ReaderProcess.cs
index 67763c5..404de7e 100644
--- a/src/DotPulsar/Internal/ReaderProcess.cs
+++ b/src/DotPulsar/Internal/ReaderProcess.cs
@@ -21,13 +21,13 @@
public sealed class ReaderProcess : Process
{
private readonly IStateManager<ReaderState> _stateManager;
- private readonly IReaderChannelFactory _factory;
+ private readonly IConsumerChannelFactory _factory;
private readonly Reader _reader;
public ReaderProcess(
Guid correlationId,
IStateManager<ReaderState> stateManager,
- IReaderChannelFactory factory,
+ IConsumerChannelFactory factory,
Reader reader) : base(correlationId)
{
_stateManager = stateManager;
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index cdcc2eb..bc32039 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -14,6 +14,7 @@
namespace DotPulsar
{
+ using DotPulsar.Internal.Extensions;
using Internal.PulsarApi;
using System;
@@ -38,42 +39,36 @@
/// </summary>
public static MessageId Latest { get; }
- internal MessageId(MessageIdData messageIdData)
- => Data = messageIdData;
-
/// <summary>
/// Initializes a new instance using the specified ledgerId, entryId, partition and batchIndex.
/// </summary>
public MessageId(ulong ledgerId, ulong entryId, int partition, int batchIndex)
- => Data = new MessageIdData
- {
- LedgerId = ledgerId,
- EntryId = entryId,
- Partition = partition,
- BatchIndex = batchIndex
- };
-
- internal MessageIdData Data { get; }
+ {
+ LedgerId = ledgerId;
+ EntryId = entryId;
+ Partition = partition;
+ BatchIndex = batchIndex;
+ }
/// <summary>
/// The id of the ledger.
/// </summary>
- public ulong LedgerId => Data.LedgerId;
+ public ulong LedgerId { get; }
/// <summary>
/// The id of the entry.
/// </summary>
- public ulong EntryId => Data.EntryId;
+ public ulong EntryId { get; }
/// <summary>
/// The partition.
/// </summary>
- public int Partition => Data.Partition;
+ public int Partition { get; }
/// <summary>
/// The batch index.
/// </summary>
- public int BatchIndex => Data.BatchIndex;
+ public int BatchIndex { get; }
public int CompareTo(MessageId? other)
{
@@ -124,5 +119,12 @@
public override string ToString()
=> $"{LedgerId}:{EntryId}:{Partition}:{BatchIndex}";
+
+ internal MessageIdData ToMessageIdData()
+ {
+ var messageIdData = new MessageIdData();
+ messageIdData.MapFrom(this);
+ return messageIdData;
+ }
}
}