Fixing minor stuff
diff --git a/samples/Consuming/Program.cs b/samples/Consuming/Program.cs
index 80bde82..1570efe 100644
--- a/samples/Consuming/Program.cs
+++ b/samples/Consuming/Program.cs
@@ -25,7 +25,7 @@
internal static class Program
{
- private static async Task Main(string[] args)
+ private static async Task Main()
{
const string myTopic = "persistent://public/default/mytopic";
diff --git a/samples/Producing/Program.cs b/samples/Producing/Program.cs
index ceb74a5..a69bc38 100644
--- a/samples/Producing/Program.cs
+++ b/samples/Producing/Program.cs
@@ -24,7 +24,7 @@
internal static class Program
{
- private static async Task Main(string[] args)
+ private static async Task Main()
{
const string myTopic = "persistent://public/default/mytopic";
diff --git a/samples/Reading/Program.cs b/samples/Reading/Program.cs
index b867e4a..8a2f0fe 100644
--- a/samples/Reading/Program.cs
+++ b/samples/Reading/Program.cs
@@ -25,7 +25,7 @@
internal static class Program
{
- private static async Task Main(string[] args)
+ private static async Task Main()
{
const string myTopic = "persistent://public/default/mytopic";
diff --git a/src/DotPulsar/Internal/ChunkingPipeline.cs b/src/DotPulsar/Internal/ChunkingPipeline.cs
index 74ecf31..2768f69 100644
--- a/src/DotPulsar/Internal/ChunkingPipeline.cs
+++ b/src/DotPulsar/Internal/ChunkingPipeline.cs
@@ -86,7 +86,11 @@
{
if (_bufferCount != 0)
{
+#if NETSTANDARD2_0
await _stream.WriteAsync(_buffer, 0, _bufferCount).ConfigureAwait(false);
+#else
+ await _stream.WriteAsync(_buffer.AsMemory(0, _bufferCount)).ConfigureAwait(false);
+#endif
_bufferCount = 0;
}
}
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index 7829173..02280dd 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -18,7 +18,6 @@
using Exceptions;
using Extensions;
using PulsarApi;
- using System;
using System.Buffers;
using System.Threading;
using System.Threading.Tasks;
@@ -172,19 +171,13 @@
{
ThrowIfDisposed();
- if (command.Command is null)
- throw new ArgumentNullException(nameof(command.Command));
-
- if (command.Metadata is null)
- throw new ArgumentNullException(nameof(command.Metadata));
-
Task<BaseCommand>? response;
using (await _lock.Lock(cancellationToken).ConfigureAwait(false))
{
- var baseCommand = command.Command.AsBaseCommand();
+ var baseCommand = command.Command!.AsBaseCommand();
response = _requestResponseHandler.Outgoing(baseCommand);
- var sequence = Serializer.Serialize(baseCommand, command.Metadata, command.Payload);
+ var sequence = Serializer.Serialize(baseCommand, command.Metadata!, command.Payload);
await _stream.Send(sequence).ConfigureAwait(false);
}
@@ -218,7 +211,7 @@
}
}
- public async Task ProcessIncommingFrames(CancellationToken cancellationToken)
+ public async Task ProcessIncommingFrames()
{
await Task.Yield();
@@ -247,7 +240,7 @@
_channelManager.Incoming(command.CloseProducer);
break;
case BaseCommand.Type.Ping:
- _pingPongHandler.Incoming(command.Ping, cancellationToken);
+ _pingPongHandler.GotPing();
break;
default:
_requestResponseHandler.Incoming(command);
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index c394cfc..f6eeb60 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -154,7 +154,7 @@
var connection = new Connection(new PulsarStream(stream));
DotPulsarEventSource.Log.ConnectionCreated();
_connections[url] = connection;
- _ = connection.ProcessIncommingFrames(cancellationToken).ContinueWith(t => DisposeConnection(url));
+ _ = connection.ProcessIncommingFrames().ContinueWith(t => DisposeConnection(url));
var commandConnect = _commandConnect;
if (url.ProxyThroughServiceUrl)
diff --git a/src/DotPulsar/Internal/Connector.cs b/src/DotPulsar/Internal/Connector.cs
index 9fc42c3..52b2774 100644
--- a/src/DotPulsar/Internal/Connector.cs
+++ b/src/DotPulsar/Internal/Connector.cs
@@ -60,7 +60,7 @@
return stream;
}
- private async Task<Stream> GetStream(string host, int port)
+ private static async Task<Stream> GetStream(string host, int port)
{
var tcpClient = new TcpClient();
diff --git a/src/DotPulsar/Internal/DefaultExceptionHandler.cs b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
index b4f26d8..a783827 100644
--- a/src/DotPulsar/Internal/DefaultExceptionHandler.cs
+++ b/src/DotPulsar/Internal/DefaultExceptionHandler.cs
@@ -39,7 +39,7 @@
exceptionContext.ExceptionHandled = true;
}
- private FaultAction DetermineFaultAction(Exception exception, CancellationToken cancellationToken)
+ private static FaultAction DetermineFaultAction(Exception exception, CancellationToken cancellationToken)
=> exception switch
{
TooManyRequestsException _ => FaultAction.Retry,
diff --git a/src/DotPulsar/Internal/DotPulsarEventSource.cs b/src/DotPulsar/Internal/DotPulsarEventSource.cs
index 7f301f6..e31f5a3 100644
--- a/src/DotPulsar/Internal/DotPulsarEventSource.cs
+++ b/src/DotPulsar/Internal/DotPulsarEventSource.cs
@@ -46,6 +46,7 @@
public sealed class DotPulsarEventSource : EventSource
{
+#pragma warning disable IDE0052 // Remove unread private members
private PollingCounter? _totalClientsCounter;
private long _totalClients;
@@ -75,6 +76,7 @@
private PollingCounter? _currentReadersCounter;
private long _currentReaders;
+#pragma warning restore IDE0052 // Remove unread private members
public static readonly DotPulsarEventSource Log = new DotPulsarEventSource();
diff --git a/src/DotPulsar/Internal/NotReadyChannel.cs b/src/DotPulsar/Internal/NotReadyChannel.cs
index 5590138..e463ff6 100644
--- a/src/DotPulsar/Internal/NotReadyChannel.cs
+++ b/src/DotPulsar/Internal/NotReadyChannel.cs
@@ -33,9 +33,6 @@
public ValueTask<Message> Receive(CancellationToken cancellationToken = default)
=> throw GetException();
- public Task<CommandSendReceipt> Send(ulong sequenceId, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
- => throw GetException();
-
public Task<CommandSendReceipt> Send(MessageMetadata metadata, ReadOnlySequence<byte> payload, CancellationToken cancellationToken)
=> throw GetException();
diff --git a/src/DotPulsar/Internal/PingPongHandler.cs b/src/DotPulsar/Internal/PingPongHandler.cs
index 9e39cc9..da406aa 100644
--- a/src/DotPulsar/Internal/PingPongHandler.cs
+++ b/src/DotPulsar/Internal/PingPongHandler.cs
@@ -16,7 +16,6 @@
{
using Abstractions;
using PulsarApi;
- using System.Threading;
using System.Threading.Tasks;
public sealed class PingPongHandler
@@ -30,14 +29,14 @@
_pong = new CommandPong();
}
- public void Incoming(CommandPing ping, CancellationToken cancellationToken)
- => Task.Factory.StartNew(() => SendPong(cancellationToken));
+ public void GotPing()
+ => Task.Factory.StartNew(() => SendPong());
- private async Task SendPong(CancellationToken cancellationToken)
+ private async Task SendPong()
{
try
{
- await _connection.Send(_pong, cancellationToken).ConfigureAwait(false);
+ await _connection.Send(_pong, default).ConfigureAwait(false);
}
catch { }
}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 867f50f..2fc5eb0 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -54,19 +54,23 @@
await _pipeline.Send(sequence).ConfigureAwait(false);
}
-#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
+#if NETSTANDARD2_0
+ public ValueTask DisposeAsync()
+ {
+ if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
+ _stream.Dispose();
+
+ return new ValueTask();
+ }
+#else
public async ValueTask DisposeAsync()
-#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
-#if NETSTANDARD2_0
- _stream.Dispose();
-#else
await _stream.DisposeAsync().ConfigureAwait(false);
-#endif
}
+#endif
private async Task FillPipe(CancellationToken cancellationToken)
{
diff --git a/src/DotPulsar/MessageId.cs b/src/DotPulsar/MessageId.cs
index b50c2dc..d8d5aef 100644
--- a/src/DotPulsar/MessageId.cs
+++ b/src/DotPulsar/MessageId.cs
@@ -102,7 +102,7 @@
=> x is not null ? x.CompareTo(y) >= 0 : y is null;
public static bool operator <=(MessageId x, MessageId y)
- => x is not null ? x.CompareTo(y) <= 0 : true;
+ => x is null || x.CompareTo(y) <= 0;
public override bool Equals(object? o)
=> o is MessageId id && Equals(id);
diff --git a/src/DotPulsar/MessageMetadata.cs b/src/DotPulsar/MessageMetadata.cs
index fb7f632..23e0c7b 100644
--- a/src/DotPulsar/MessageMetadata.cs
+++ b/src/DotPulsar/MessageMetadata.cs
@@ -26,7 +26,7 @@
public MessageMetadata()
=> Metadata = new Internal.PulsarApi.MessageMetadata();
- internal readonly Internal.PulsarApi.MessageMetadata Metadata;
+ internal Internal.PulsarApi.MessageMetadata Metadata { get; }
/// <summary>
/// The delivery time of the message as unix time in milliseconds.
diff --git a/tests/DotPulsar.StressTests/ConsumerTests.cs b/tests/DotPulsar.StressTests/ConsumerTests.cs
index fa32611..2bc3421 100644
--- a/tests/DotPulsar.StressTests/ConsumerTests.cs
+++ b/tests/DotPulsar.StressTests/ConsumerTests.cs
@@ -70,7 +70,7 @@
consumed.Should().BeEquivalentTo(produced);
}
- private async Task<IEnumerable<MessageId>> ProduceMessages(IProducer producer, int numberOfMessages, CancellationToken ct)
+ private static async Task<IEnumerable<MessageId>> ProduceMessages(IProducer producer, int numberOfMessages, CancellationToken ct)
{
var messageIds = new MessageId[numberOfMessages];
@@ -83,7 +83,7 @@
return messageIds;
}
- private async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer consumer, int numberOfMessages, CancellationToken ct)
+ private static async Task<IEnumerable<MessageId>> ConsumeMessages(IConsumer consumer, int numberOfMessages, CancellationToken ct)
{
var messageIds = new List<MessageId>(numberOfMessages);
diff --git a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
index 7a7d23a..d460de3 100644
--- a/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
+++ b/tests/DotPulsar.StressTests/EnumerableTaskExtensions.cs
@@ -13,7 +13,6 @@
*/
#pragma warning disable 8601
-#pragma warning disable 8625
namespace DotPulsar.StressTests
{
diff --git a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
index 8263594..332a94e 100644
--- a/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
+++ b/tests/DotPulsar.Tests/Internal/AsyncQueueTests.cs
@@ -110,13 +110,13 @@
CancellationTokenSource source1 = new CancellationTokenSource(), source2 = new CancellationTokenSource();
const int excepted = 1;
var queue = new AsyncQueue<int>();
- var task1 = queue.Dequeue(source1.Token);
- var task2 = queue.Dequeue(source2.Token);
+ var task1 = queue.Dequeue(source1.Token).AsTask();
+ var task2 = queue.Dequeue(source2.Token).AsTask();
//Act
source1.Cancel();
queue.Enqueue(excepted);
- var exception = await Record.ExceptionAsync(() => task1.AsTask()).ConfigureAwait(false); // xUnit can't record ValueTask yet
+ var exception = await Record.ExceptionAsync(() => task1).ConfigureAwait(false);
await task2.ConfigureAwait(false);
//Assert
diff --git a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
index de3895b..65ab5f5 100644
--- a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
+++ b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
@@ -91,7 +91,8 @@
private static int GetNumberOfSegments(ReadOnlySequence<byte> sequence)
{
var numberOfSegments = 0;
- foreach (var segment in sequence)
+ var enumerator = sequence.GetEnumerator();
+ while (enumerator.MoveNext())
++numberOfSegments;
return numberOfSegments;
}