pass cancellation token to async lock
diff --git a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
index 3ce1b95..766e9ad 100644
--- a/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
+++ b/src/DotPulsar.Tests/Internal/AsyncLockTests.cs
@@ -29,7 +29,7 @@
var sut = new AsyncLock();
//Act
- var actual = sut.Lock();
+ var actual = sut.Lock(CancellationToken.None);
//Assert
Assert.True(actual.IsCompleted);
@@ -44,10 +44,10 @@
{
//Arrange
var sut = new AsyncLock();
- var alreadyTaken = await sut.Lock();
+ var alreadyTaken = await sut.Lock(CancellationToken.None);
//Act
- var actual = sut.Lock();
+ var actual = sut.Lock(CancellationToken.None);
//Assert
Assert.False(actual.IsCompleted);
@@ -66,7 +66,7 @@
await sut.DisposeAsync();
//Act
- var exception = await Record.ExceptionAsync(() => sut.Lock());
+ var exception = await Record.ExceptionAsync(() => sut.Lock(CancellationToken.None));
//Assert
Assert.IsType<ObjectDisposedException>(exception);
@@ -77,8 +77,8 @@
{
//Arrange
var sut = new AsyncLock();
- var gotLock = await sut.Lock();
- var awaiting = sut.Lock();
+ var gotLock = await sut.Lock(CancellationToken.None);
+ var awaiting = sut.Lock(CancellationToken.None);
_ = Task.Run(async () => await sut.DisposeAsync());
//Act
@@ -98,7 +98,7 @@
//Arrange
var cts = new CancellationTokenSource();
var sut = new AsyncLock();
- var gotLock = await sut.Lock();
+ var gotLock = await sut.Lock(CancellationToken.None);
var awaiting = sut.Lock(cts.Token);
//Act
@@ -119,7 +119,7 @@
{
//Arrange
var sut = new AsyncLock();
- var gotLock = await sut.Lock();
+ var gotLock = await sut.Lock(CancellationToken.None);
var disposeTask = Task.Run(async () => await sut.DisposeAsync());
Assert.False(disposeTask.IsCompleted);
diff --git a/src/DotPulsar/Internal/Abstractions/IConnection.cs b/src/DotPulsar/Internal/Abstractions/IConnection.cs
index 459300e..cce261d 100644
--- a/src/DotPulsar/Internal/Abstractions/IConnection.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConnection.cs
@@ -14,29 +14,30 @@
using DotPulsar.Internal.PulsarApi;
using System;
+using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal.Abstractions
{
public interface IConnection : IAsyncDisposable
{
- ValueTask<bool> HasChannels();
+ ValueTask<bool> HasChannels(CancellationToken cancellationToken);
- Task<ProducerResponse> Send(CommandProducer command, IChannel channel);
- Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel);
+ Task<ProducerResponse> Send(CommandProducer command, IChannel channel, CancellationToken cancellationToken);
+ Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, CancellationToken cancellationToken);
- Task Send(CommandPing command);
- Task Send(CommandPong command);
- Task Send(CommandAck command);
- Task Send(CommandFlow command);
+ Task Send(CommandPing command, CancellationToken cancellationToken);
+ Task Send(CommandPong command, CancellationToken cancellationToken);
+ Task Send(CommandAck command, CancellationToken cancellationToken);
+ Task Send(CommandFlow command, CancellationToken cancellationToken);
- Task<BaseCommand> Send(CommandUnsubscribe command);
- Task<BaseCommand> Send(CommandConnect command);
- Task<BaseCommand> Send(CommandLookupTopic command);
- Task<BaseCommand> Send(CommandSeek command);
- Task<BaseCommand> Send(CommandGetLastMessageId command);
- Task<BaseCommand> Send(CommandCloseProducer command);
- Task<BaseCommand> Send(CommandCloseConsumer command);
- Task<BaseCommand> Send(SendPackage command);
+ Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken);
+ Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
index dc05155..a2cbe34 100644
--- a/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IConsumerChannel.cs
@@ -21,10 +21,10 @@
{
public interface IConsumerChannel : IAsyncDisposable
{
- Task Send(CommandAck command);
- Task<CommandSuccess> Send(CommandUnsubscribe command);
- Task<CommandSuccess> Send(CommandSeek command);
- Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command);
- ValueTask<Message> Receive(CancellationToken cancellationToken = default);
+ Task Send(CommandAck command, CancellationToken cancellationToken);
+ Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken);
+ Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken);
+ Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken);
+ ValueTask<Message> Receive(CancellationToken cancellationToken);
}
}
diff --git a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
index 57276ce..f15fca2 100644
--- a/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
+++ b/src/DotPulsar/Internal/Abstractions/IProducerChannel.cs
@@ -15,13 +15,14 @@
using DotPulsar.Internal.PulsarApi;
using System;
using System.Buffers;
+using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal.Abstractions
{
public interface IProducerChannel : IAsyncDisposable
{
- Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload);
- Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload);
+ Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
+ Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken);
}
}
\ No newline at end of file
diff --git a/src/DotPulsar/Internal/AsyncLock.cs b/src/DotPulsar/Internal/AsyncLock.cs
index a374518..de04523 100644
--- a/src/DotPulsar/Internal/AsyncLock.cs
+++ b/src/DotPulsar/Internal/AsyncLock.cs
@@ -36,7 +36,7 @@
_isDisposed = false;
}
- public Task<IDisposable> Lock(CancellationToken cancellationToken = default)
+ public Task<IDisposable> Lock(CancellationToken cancellationToken)
{
LinkedListNode<CancelableCompletionSource<IDisposable>>? node = null;
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index b14770a..eb27e11 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -15,6 +15,7 @@
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.Extensions;
using DotPulsar.Internal.PulsarApi;
+using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal
@@ -36,19 +37,19 @@
_stream = stream;
}
- public async ValueTask<bool> HasChannels()
+ public async ValueTask<bool> HasChannels(CancellationToken cancellationToken)
{
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
return _channelManager.HasChannels();
}
}
- public async Task<ProducerResponse> Send(CommandProducer command, IChannel channel)
+ public async Task<ProducerResponse> Send(CommandProducer command, IChannel channel, CancellationToken cancellationToken)
{
Task<ProducerResponse>? responseTask = null;
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
var baseCommand = command.AsBaseCommand();
var requestResponseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -60,11 +61,11 @@
return await responseTask;
}
- public async Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel)
+ public async Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, CancellationToken cancellationToken)
{
Task<SubscribeResponse>? responseTask = null;
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
var baseCommand = command.AsBaseCommand();
var requestResponseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -76,16 +77,23 @@
return await responseTask;
}
- public async Task Send(CommandPing command) => await Send(command.AsBaseCommand());
- public async Task Send(CommandPong command) => await Send(command.AsBaseCommand());
- public async Task Send(CommandAck command) => await Send(command.AsBaseCommand());
- public async Task Send(CommandFlow command) => await Send(command.AsBaseCommand());
+ public async Task Send(CommandPing command, CancellationToken cancellationToken) =>
+ await Send(command.AsBaseCommand(), cancellationToken);
- public async Task<BaseCommand> Send(CommandUnsubscribe command)
+ public async Task Send(CommandPong command, CancellationToken cancellationToken) =>
+ await Send(command.AsBaseCommand(), cancellationToken);
+
+ public async Task Send(CommandAck command, CancellationToken cancellationToken) =>
+ await Send(command.AsBaseCommand(), cancellationToken);
+
+ public async Task Send(CommandFlow command, CancellationToken cancellationToken) =>
+ await Send(command.AsBaseCommand(), cancellationToken);
+
+ public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
Task<BaseCommand>? responseTask = null;
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
var baseCommand = command.AsBaseCommand();
responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -97,16 +105,23 @@
return await responseTask;
}
- public async Task<BaseCommand> Send(CommandConnect command) => await SendRequestResponse(command.AsBaseCommand());
- public async Task<BaseCommand> Send(CommandLookupTopic command) => await SendRequestResponse(command.AsBaseCommand());
- public async Task<BaseCommand> Send(CommandSeek command) => await SendRequestResponse(command.AsBaseCommand());
- public async Task<BaseCommand> Send(CommandGetLastMessageId command) => await SendRequestResponse(command.AsBaseCommand());
+ public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken) =>
+ await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
- public async Task<BaseCommand> Send(CommandCloseProducer command)
+ public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken) =>
+ await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+
+ public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken) =>
+ await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+
+ public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) =>
+ await SendRequestResponse(command.AsBaseCommand(), cancellationToken);
+
+ public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken)
{
Task<BaseCommand>? responseTask = null;
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
var baseCommand = command.AsBaseCommand();
responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -118,11 +133,11 @@
return await responseTask;
}
- public async Task<BaseCommand> Send(CommandCloseConsumer command)
+ public async Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken)
{
Task<BaseCommand>? responseTask = null;
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
var baseCommand = command.AsBaseCommand();
responseTask = _requestResponseHandler.Outgoing(baseCommand);
@@ -134,10 +149,10 @@
return await responseTask;
}
- public async Task<BaseCommand> Send(SendPackage command)
+ public async Task<BaseCommand> Send(SendPackage command, CancellationToken cancellationToken)
{
Task<BaseCommand>? response = null;
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
var baseCommand = command.Command.AsBaseCommand();
response = _requestResponseHandler.Outgoing(baseCommand);
@@ -147,10 +162,10 @@
return await response;
}
- private async Task<BaseCommand> SendRequestResponse(BaseCommand command)
+ private async Task<BaseCommand> SendRequestResponse(BaseCommand command, CancellationToken cancellationToken)
{
Task<BaseCommand>? response = null;
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
response = _requestResponseHandler.Outgoing(command);
var sequence = Serializer.Serialize(command);
@@ -159,16 +174,16 @@
return await response;
}
- private async Task Send(BaseCommand command)
+ private async Task Send(BaseCommand command, CancellationToken cancellationToken)
{
- using (await _lock.Lock())
+ using (await _lock.Lock(cancellationToken))
{
var sequence = Serializer.Serialize(command);
await _stream.Send(sequence);
}
}
- public async Task ProcessIncommingFrames()
+ public async Task ProcessIncommingFrames(CancellationToken cancellationToken)
{
await Task.Yield();
@@ -197,7 +212,7 @@
_channelManager.Incoming(command.CloseProducer);
break;
case BaseCommand.Type.Ping:
- _pingPongHandler.Incoming(command.Ping);
+ _pingPongHandler.Incoming(command.Ping, cancellationToken);
break;
default:
_requestResponseHandler.Incoming(command);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index d3dff22..7c1f7b5 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -74,7 +74,7 @@
while (true)
{
var connection = await GetConnection(serviceUrl, cancellationToken);
- var response = await connection.Send(lookup);
+ var response = await connection.Send(lookup, cancellationToken);
response.Expect(BaseCommand.Type.LookupResponse);
if (response.LookupTopicResponse.Response == CommandLookupTopicResponse.LookupType.Failed)
@@ -124,18 +124,18 @@
if (_connections.TryGetValue(serviceUrl, out Connection connection))
return connection;
- return await EstablishNewConnection(serviceUrl);
+ return await EstablishNewConnection(serviceUrl, cancellationToken);
}
}
- private async Task<Connection> EstablishNewConnection(Uri serviceUrl)
+ private async Task<Connection> EstablishNewConnection(Uri serviceUrl, CancellationToken cancellationToken)
{
var stream = await _connector.Connect(serviceUrl);
var connection = new Connection(new PulsarStream(stream));
DotPulsarEventSource.Log.ConnectionCreated();
_connections[serviceUrl] = connection;
- _ = connection.ProcessIncommingFrames().ContinueWith(t => DisposeConnection(serviceUrl));
- var response = await connection.Send(_commandConnect);
+ _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(serviceUrl));
+ var response = await connection.Send(_commandConnect, cancellationToken);
response.Expect(BaseCommand.Type.Connected);
return connection;
}
@@ -165,7 +165,7 @@
var connection = _connections[serviceUrl];
if (connection is null)
continue;
- if (!await connection.HasChannels())
+ if (!await connection.HasChannels(cancellationToken))
await DisposeConnection(serviceUrl);
}
}
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 440961d..9b623ad 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -96,21 +96,21 @@
public async ValueTask Unsubscribe(CancellationToken cancellationToken)
{
ThrowIfDisposed();
- _ = await _executor.Execute(() => _channel.Send(new CommandUnsubscribe()), cancellationToken);
+ _ = await _executor.Execute(() => _channel.Send(new CommandUnsubscribe(), cancellationToken), cancellationToken);
}
public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
{
ThrowIfDisposed();
var seek = new CommandSeek { MessageId = messageId.Data };
- _ = await _executor.Execute(() => _channel.Send(seek), cancellationToken);
+ _ = await _executor.Execute(() => _channel.Send(seek, cancellationToken), cancellationToken);
return;
}
public async ValueTask<MessageId> GetLastMessageId(CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId()), cancellationToken);
+ var response = await _executor.Execute(() => _channel.Send(new CommandGetLastMessageId(), cancellationToken), cancellationToken);
return new MessageId(response.LastMessageId);
}
@@ -122,7 +122,7 @@
_cachedCommandAck.Type = ackType;
_cachedCommandAck.MessageIds.Clear();
_cachedCommandAck.MessageIds.Add(messageIdData);
- return _channel.Send(_cachedCommandAck);
+ return _channel.Send(_cachedCommandAck, cancellationToken);
}, cancellationToken);
}
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index 9373da1..735b078 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -52,7 +52,7 @@
while (true)
{
if (_sendWhenZero == 0)
- await SendFlow();
+ await SendFlow(cancellationToken);
_sendWhenZero--;
@@ -64,7 +64,7 @@
if (!messagePackage.IsValid())
{
- await RejectPackage(messagePackage);
+ await RejectPackage(messagePackage, cancellationToken);
continue;
}
@@ -79,7 +79,7 @@
}
}
- public async Task Send(CommandAck command)
+ public async Task Send(CommandAck command, CancellationToken cancellationToken)
{
var messageId = command.MessageIds[0];
if (messageId.BatchIndex != -1)
@@ -92,30 +92,30 @@
}
command.ConsumerId = _id;
- await _connection.Send(command);
+ await _connection.Send(command, cancellationToken);
}
- public async Task<CommandSuccess> Send(CommandUnsubscribe command)
+ public async Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
- var response = await _connection.Send(command);
+ var response = await _connection.Send(command, cancellationToken);
response.Expect(BaseCommand.Type.Success);
return response.Success;
}
- public async Task<CommandSuccess> Send(CommandSeek command)
+ public async Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
- var response = await _connection.Send(command);
+ var response = await _connection.Send(command, cancellationToken);
response.Expect(BaseCommand.Type.Success);
_batchHandler.Clear();
return response.Success;
}
- public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command)
+ public async Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken)
{
command.ConsumerId = _id;
- var response = await _connection.Send(command);
+ var response = await _connection.Send(command, cancellationToken);
response.Expect(BaseCommand.Type.GetLastMessageIdResponse);
return response.GetLastMessageIdResponse;
}
@@ -125,7 +125,7 @@
try
{
_queue.Dispose();
- await _connection.Send(new CommandCloseConsumer { ConsumerId = _id });
+ await _connection.Send(new CommandCloseConsumer { ConsumerId = _id }, CancellationToken.None);
}
catch
{
@@ -133,9 +133,10 @@
}
}
- private async ValueTask SendFlow()
+ private async ValueTask SendFlow(CancellationToken cancellationToken)
{
- await _connection.Send(_cachedCommandFlow); //TODO Should sending the flow command be handled on another thread and thereby not slow down the consumer?
+ //TODO Should sending the flow command be handled on another thread and thereby not slow down the consumer?
+ await _connection.Send(_cachedCommandFlow, cancellationToken);
if (_firstFlow)
{
@@ -146,7 +147,7 @@
_sendWhenZero = _cachedCommandFlow.MessagePermits;
}
- private async Task RejectPackage(MessagePackage messagePackage)
+ private async Task RejectPackage(MessagePackage messagePackage, CancellationToken cancellationToken)
{
var ack = new CommandAck
{
@@ -156,7 +157,7 @@
ack.MessageIds.Add(messagePackage.MessageId);
- await Send(ack);
+ await Send(ack, cancellationToken);
}
}
}
diff --git a/src/DotPulsar/Internal/ConsumerChannelFactory.cs b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
index 5424c32..9746c67 100644
--- a/src/DotPulsar/Internal/ConsumerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ConsumerChannelFactory.cs
@@ -65,7 +65,7 @@
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
- var response = await connection.Send(_subscribe, channel);
+ var response = await connection.Send(_subscribe, channel, cancellationToken);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
}
}
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index 4dc0ce9..ca958b7 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -28,17 +28,18 @@
public ValueTask<Message> Receive(CancellationToken cancellationToken = default) => throw GetException();
- public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload) => throw GetException();
+ public Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken) => throw GetException();
- public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload) => throw GetException();
+ public Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken) =>
+ throw GetException();
- public Task Send(CommandAck command) => throw GetException();
+ public Task Send(CommandAck command, CancellationToken cancellationToken) => throw GetException();
- public Task<CommandSuccess> Send(CommandUnsubscribe command) => throw GetException();
+ public Task<CommandSuccess> Send(CommandUnsubscribe command, CancellationToken cancellationToken) => throw GetException();
- public Task<CommandSuccess> Send(CommandSeek command) => throw GetException();
+ public Task<CommandSuccess> Send(CommandSeek command, CancellationToken cancellationToken) => throw GetException();
- public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command) => throw GetException();
+ public Task<CommandGetLastMessageIdResponse> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) => throw GetException();
private Exception GetException() => new ChannelNotReadyException();
}
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs
index a0e0c23..8b16349 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -12,6 +12,7 @@
* limitations under the License.
*/
+using System.Threading;
using DotPulsar.Internal.Abstractions;
using DotPulsar.Internal.PulsarApi;
@@ -28,9 +29,9 @@
_pong = new CommandPong();
}
- public void Incoming(CommandPing ping)
+ public void Incoming(CommandPing ping, CancellationToken cancellationToken)
{
- _ = _connection.Send(_pong);
+ _ = _connection.Send(_pong, cancellationToken);
}
}
}
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index f5750f1..2d850cf 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -76,7 +76,7 @@
public async ValueTask<MessageId> Send(ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(data), cancellationToken);
+ var response = await _executor.Execute(() => _channel.Send(data, cancellationToken), cancellationToken);
return new MessageId(response.MessageId);
}
@@ -89,7 +89,7 @@
public async ValueTask<MessageId> Send(MessageMetadata metadata, ReadOnlySequence<byte> data, CancellationToken cancellationToken)
{
ThrowIfDisposed();
- var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data), cancellationToken);
+ var response = await _executor.Execute(() => _channel.Send(metadata.Metadata, data, cancellationToken), cancellationToken);
return new MessageId(response.MessageId);
}
diff --git a/src/DotPulsar/Internal/ProducerChannel.cs b/src/DotPulsar/Internal/ProducerChannel.cs
index d9dc561..49b70a9 100644
--- a/src/DotPulsar/Internal/ProducerChannel.cs
+++ b/src/DotPulsar/Internal/ProducerChannel.cs
@@ -17,6 +17,7 @@
using DotPulsar.Internal.PulsarApi;
using System;
using System.Buffers;
+using System.Threading;
using System.Threading.Tasks;
namespace DotPulsar.Internal
@@ -53,7 +54,7 @@
{
try
{
- await _connection.Send(new CommandCloseProducer { ProducerId = _id });
+ await _connection.Send(new CommandCloseProducer { ProducerId = _id }, CancellationToken.None);
}
catch
{
@@ -61,22 +62,22 @@
}
}
- public async Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload)
+ public async Task<CommandSendReceipt> Send(ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
_cachedSendPackage.Metadata = _cachedMetadata;
_cachedSendPackage.Payload = payload;
- return await SendPackage(true);
+ return await SendPackage(true, cancellationToken);
}
- public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload)
+ public async Task<CommandSendReceipt> Send(PulsarApi.MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
{
metadata.ProducerName = _cachedMetadata.ProducerName;
_cachedSendPackage.Metadata = metadata;
_cachedSendPackage.Payload = payload;
- return await SendPackage(metadata.SequenceId == 0);
+ return await SendPackage(metadata.SequenceId == 0, cancellationToken);
}
- private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId)
+ private async Task<CommandSendReceipt> SendPackage(bool autoAssignSequenceId, CancellationToken cancellationToken)
{
try
{
@@ -90,7 +91,7 @@
else
_cachedSendPackage.Command.SequenceId = _cachedSendPackage.Metadata.SequenceId;
- var response = await _connection.Send(_cachedSendPackage);
+ var response = await _connection.Send(_cachedSendPackage, cancellationToken);
response.Expect(BaseCommand.Type.SendReceipt);
if (autoAssignSequenceId)
diff --git a/src/DotPulsar/Internal/ProducerChannelFactory.cs b/src/DotPulsar/Internal/ProducerChannelFactory.cs
index da31a54..7426c22 100644
--- a/src/DotPulsar/Internal/ProducerChannelFactory.cs
+++ b/src/DotPulsar/Internal/ProducerChannelFactory.cs
@@ -56,7 +56,7 @@
{
var connection = await _connectionPool.FindConnectionForTopic(_commandProducer.Topic, cancellationToken);
var channel = new Channel(_correlationId, _eventRegister, new AsyncQueue<MessagePackage>());
- var response = await connection.Send(_commandProducer, channel);
+ var response = await connection.Send(_commandProducer, channel, cancellationToken);
return new ProducerChannel(response.ProducerId, response.ProducerName, _sequenceId, connection);
}
}
diff --git a/src/DotPulsar/Internal/ReaderChannelFactory.cs b/src/DotPulsar/Internal/ReaderChannelFactory.cs
index e95687e..e140854 100644
--- a/src/DotPulsar/Internal/ReaderChannelFactory.cs
+++ b/src/DotPulsar/Internal/ReaderChannelFactory.cs
@@ -64,7 +64,7 @@
var connection = await _connectionPool.FindConnectionForTopic(_subscribe.Topic, cancellationToken);
var messageQueue = new AsyncQueue<MessagePackage>();
var channel = new Channel(_correlationId, _eventRegister, messageQueue);
- var response = await connection.Send(_subscribe, channel);
+ var response = await connection.Send(_subscribe, channel, cancellationToken);
return new ConsumerChannel(response.ConsumerId, _messagePrefetchCount, messageQueue, connection, _batchHandler);
}
}