Moving SequenceId from the ProducerChannel to the Producer.
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index 134d406..8db5b3d 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -61,8 +61,10 @@
{
while (!cancellationToken.IsCancellationRequested)
{
- var data = Encoding.UTF8.GetBytes("Sent " + DateTime.UtcNow);
- _ = await producer.Send(data, cancellationToken).ConfigureAwait(false);
+ var data = DateTime.UtcNow.ToLongTimeString();
+ var bytes = Encoding.UTF8.GetBytes(data);
+ _ = await producer.Send(bytes, cancellationToken).ConfigureAwait(false);
+ Console.WriteLine("Sent: " + data);
await Task.Delay(delay).ConfigureAwait(false);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 8375447..9fe2eb1 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -22,7 +22,7 @@
public interface IProducerChannel : IAsyncDisposable
{
- Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+ Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 94a68b9..f98d9fd 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -66,7 +66,6 @@
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
-
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
}
}
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index 93f572a..fee9bb4 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -30,7 +30,7 @@
public ValueTask<Message> Receive(CancellationToken cancellationToken = default)
=> throw GetException();
- public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+ public Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
=> throw GetException();
public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index efe5bc0..da2aacd 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -30,6 +30,7 @@
private IProducerChannel _channel;
private readonly IExecute _executor;
private readonly IStateChanged<ProducerState> _state;
+ private readonly SequenceId _sequenceId;
private int _isDisposed;
public string Topic { get; }
@@ -37,6 +38,7 @@
public Producer(
Guid correlationId,
string topic,
+ ulong initialSequenceId,
IRegisterEvent registerEvent,
IProducerChannel initialChannel,
IExecute executor,
@@ -44,6 +46,7 @@
{
_correlationId = correlationId;
Topic = topic;
+ _sequenceId = new SequenceId(initialSequenceId);
_eventRegister = registerEvent;
_channel = initialChannel;
_executor = executor;
@@ -90,7 +93,8 @@
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken).ConfigureAwait(false);
+ var sequenceId = _sequenceId.FetchNext();
+ var response = await _executor.Execute(() => _channel.Send(sequenceId, data, cancellationToken), cancellationToken).ConfigureAwait(false);
return new MessageId(response.MessageId);
}
@@ -103,8 +107,21 @@
public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
- return new MessageId(response.MessageId);
+
+ var autoAssignSequenceId = metadata.SequenceId == 0;
+ if (autoAssignSequenceId)
+ metadata.SequenceId = _sequenceId.FetchNext();
+
+ try
+ {
+ var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken).ConfigureAwait(false);
+ return new MessageId(response.MessageId);
+ }
+ finally
+ {
+ if (autoAssignSequenceId)
+ metadata.SequenceId = 0;
+ }
}
internal async ValueTask SetChannel(IProducerChannel channel)
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index 0ba8cd1..e7e062b 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
@@ -25,20 +25,20 @@
public sealed class ProducerChannel : IProducerChannel
{
- private readonly MessageMetadata _cachedMetadata;
+ private readonly ObjectPool<MessageMetadata> _messageMetadataPool;
private readonly ObjectPool<SendPackage> _sendPackagePool;
private readonly ulong _id;
private readonly string _name;
- private readonly SequenceId _sequenceId;
private readonly IConnection _connection;
- public ProducerChannel(ulong id, string name, SequenceId sequenceId, IConnection connection)
+ public ProducerChannel(ulong id, string name, IConnection connection)
{
- _cachedMetadata = new MessageMetadata { ProducerName = name };
- _sendPackagePool = new DefaultObjectPool<SendPackage>(new DefaultPooledObjectPolicy<SendPackage>());
+ var messageMetadataPolicy = new DefaultPooledObjectPolicy<MessageMetadata>();
+ _messageMetadataPool = new DefaultObjectPool<MessageMetadata>(messageMetadataPolicy);
+ var sendPackagePolicy = new DefaultPooledObjectPolicy<SendPackage>();
+ _sendPackagePool = new DefaultObjectPool<SendPackage>(sendPackagePolicy);
_id = id;
_name = name;
- _sequenceId = sequenceId;
_connection = connection;
}
@@ -55,19 +55,30 @@
}
}
- public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
- => SendPackage(_cachedMetadata, payload, true, cancellationToken);
+ public Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
+ {
+ var metadata = _messageMetadataPool.Get();
+ metadata.ProducerName = _name;
+ metadata.SequenceId = sequenceId;
+ try
+ {
+ return SendPackage(metadata, payload, cancellationToken);
+ }
+ finally
+ {
+ _messageMetadataPool.Return(metadata);
+ }
+ }
public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
metadata.ProducerName = _name;
- return SendPackage(metadata, payload, metadata.SequenceId == 0, cancellationToken);
+ return SendPackage(metadata, payload, cancellationToken);
}
private async Task<CommandSendReceipt> SendPackage(
MessageMetadata metadata,
ReadOnlySequence<byte> payload,
- bool autoAssignSequenceId,
CancellationToken cancellationToken)
{
var sendPackage = _sendPackagePool.Get();
@@ -81,36 +92,21 @@
};
}
+ sendPackage.Command.SequenceId = metadata.SequenceId;
sendPackage.Metadata = metadata;
sendPackage.Payload = payload;
+ metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); // TODO Benchmark against StopWatch
try
{
- metadata.PublishTime = (ulong) DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
-
- if (autoAssignSequenceId)
- {
- var newSequenceId = _sequenceId.FetchNext();
- sendPackage.Command.SequenceId = newSequenceId;
- sendPackage.Metadata.SequenceId = newSequenceId;
- }
- else
- sendPackage.Command.SequenceId = sendPackage.Metadata.SequenceId;
-
var response = await _connection.Send(sendPackage, cancellationToken).ConfigureAwait(false);
response.Expect(BaseCommand.Type.SendReceipt);
-
return response.SendReceipt;
}
finally
{
- // Reset in case the user reuse the MessageMetadata, but is not explicitly setting the sequenceId
- if (autoAssignSequenceId)
- sendPackage.Metadata.SequenceId = 0;
-
_sendPackagePool.Return(sendPackage);
}
}
}
}
-
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index e7d8327..87c76de 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -26,7 +26,6 @@
private readonly IRegisterEvent _eventRegister;
private readonly IConnectionPool _connectionPool;
private readonly IExecute _executor;
- private readonly SequenceId _sequenceId;
private readonly CommandProducer _commandProducer;
public ProducerChannelFactory(
@@ -40,7 +39,6 @@
_eventRegister = eventRegister;
_connectionPool = connectionPool;
_executor = executor;
- _sequenceId = new SequenceId(options.InitialSequenceId);
_commandProducer = new CommandProducer
{
@@ -57,8 +55,7 @@
var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken).ConfigureAwait(false);
var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
var response = await connection.Send(_commandProducer, channel, cancellationToken).ConfigureAwait(false);
-
- return new ProducerChannel(response.ProducerId, response.ProducerName, _sequenceId, connection);
+ return new ProducerChannel(response.ProducerId, response.ProducerName, connection);
}
}
}
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index 03140ad..9efe033 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -65,7 +65,6 @@
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
var response = await connection.Send(_subscribe, channel, cancellationToken).ConfigureAwait(false);
-
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
}
}
diff --git a/src/DotPulsar/Internal/RequestId.cs b/src/DotPulsar/Internal/RequestId.cs
new file mode 100644
index 0000000..128ac5e
--- /dev/null
+++ b/src/DotPulsar/Internal/RequestId.cs
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ public sealed class RequestId
+ {
+ private ulong _current;
+
+ public RequestId()
+ => _current = 0;
+
+ public bool IsPastInitialId()
+ => _current != 0;
+
+ public ulong FetchNext()
+ => _current++;
+ }
+}
diff --git a/src/DotPulsar/Internal/RequestResponseHandler.cs b/src/DotPulsar/Internal/RequestResponseHandler.cs
index c855337..1885aa4 100644
--- a/src/DotPulsar/Internal/RequestResponseHandler.cs
+++ b/src/DotPulsar/Internal/RequestResponseHandler.cs
@@ -23,12 +23,12 @@
private const string ConnectResponseIdentifier = "Connected";
private readonly Awaiter<string, BaseCommand> _responses;
- private SequenceId _requestId;
+ private RequestId _requestId;
public RequestResponseHandler()
{
_responses = new Awaiter<string, BaseCommand>();
- _requestId = new SequenceId(1);
+ _requestId = new RequestId();
}
public void Dispose()
diff --git a/src/DotPulsar/Internal/SequenceId.cs b/src/DotPulsar/Internal/SequenceId.cs
index 55ba94a..aba1c25 100644
--- a/src/DotPulsar/Internal/SequenceId.cs
+++ b/src/DotPulsar/Internal/SequenceId.cs
@@ -12,32 +12,22 @@
* limitations under the License.
*/
-using System.Threading;
-
namespace DotPulsar.Internal
{
+ using System.Threading;
+
public sealed class SequenceId
{
private long _current;
- private ulong _initial;
public SequenceId(ulong initialSequenceId)
{
// Subtracting one because Interlocked.Increment will return the post-incremented value
// which is expected to be the initialSequenceId for the first call
- _current = unchecked((long)initialSequenceId - 1);
- _initial = initialSequenceId - 1;
- }
-
- // Returns false if FetchNext has not been called on this object before (or if it somehow wrapped around 2^64)
- public bool IsPastInitialId()
- {
- return unchecked((ulong)_current != _initial);
+ _current = unchecked((long) initialSequenceId - 1);
}
public ulong FetchNext()
- {
- return unchecked((ulong)Interlocked.Increment(ref _current));
- }
+ => unchecked((ulong) Interlocked.Increment(ref _current));
}
}
diff --git a/src/DotPulsar/PulsarClient.cs b/src/DotPulsar/PulsarClient.cs
index 260f3ce..982ee3b 100644
--- a/src/DotPulsar/PulsarClient.cs
+++ b/src/DotPulsar/PulsarClient.cs
@@ -57,7 +57,7 @@
var executor = new Executor(correlationId, _processManager, _exceptionHandler);
var factory = new ProducerChannelFactory(correlationId, _processManager, _connectionPool, executor, options);
var stateManager = new StateManager<ProducerState>(ProducerState.Disconnected, ProducerState.Closed, ProducerState.Faulted);
- var producer = new Producer(correlationId, options.Topic, _processManager, new NotReadyChannel(), executor, stateManager);
+ var producer = new Producer(correlationId, options.Topic, options.InitialSequenceId, _processManager, new NotReadyChannel(), executor, stateManager);
var process = new ProducerProcess(correlationId, stateManager, factory, producer);
_processManager.Add(process);
process.Start();