Code cleanup
diff --git a/src/DotPulsar/Extensions/ProducerExtensions.cs b/src/DotPulsar/Extensions/ProducerExtensions.cs
index d292468..130941e 100644
--- a/src/DotPulsar/Extensions/ProducerExtensions.cs
+++ b/src/DotPulsar/Extensions/ProducerExtensions.cs
@@ -54,7 +54,7 @@
/// <remarks>
/// If the state change to a final state, then all awaiting tasks will complete.
/// </remarks>
- public static async ValueTask<ProducerStateChanged> StateChangedFrom(this IProducer producer, ProducerState state, CancellationToken cancellationToken = default)
+ public static async ValueTask<ProducerStateChanged> StateChangedFrom(this IProducer producer, ProducerState state, CancellationToken cancellationToken = default)
{
var newState = await producer.OnStateChangeFrom(state, cancellationToken).ConfigureAwait(false);
return new ProducerStateChanged(producer, newState);
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index 8f61858..b28ba5b 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -137,7 +137,7 @@
_ = response.ContinueWith(result =>
{
if (result.Result.CommandType == BaseCommand.Type.Success)
- _consumerChannels.Remove(consumerId)?.Unsubscribed();
+ _consumerChannels.Remove(consumerId)?.Unsubscribed();
}, TaskContinuationOptions.OnlyOnRanToCompletion);
return response;
diff --git a/src/DotPulsar/Internal/Compression/Lz4Compression.cs b/src/DotPulsar/Internal/Compression/Lz4Compression.cs
index 1411bf1..d528fb7 100644
--- a/src/DotPulsar/Internal/Compression/Lz4Compression.cs
+++ b/src/DotPulsar/Internal/Compression/Lz4Compression.cs
@@ -50,7 +50,7 @@
return true;
}
catch
- {
+ {
// Ignore
}
diff --git a/src/DotPulsar/Internal/Compression/ZstdCompression.cs b/src/DotPulsar/Internal/Compression/ZstdCompression.cs
index fd5f281..c9f5f20 100644
--- a/src/DotPulsar/Internal/Compression/ZstdCompression.cs
+++ b/src/DotPulsar/Internal/Compression/ZstdCompression.cs
@@ -83,9 +83,9 @@
if (type.FullName is null || !type.FullName.Equals(fullName))
continue;
- if (type.IsPublic &&
- type.IsClass &&
- !type.IsAbstract &&
+ if (type.IsPublic &&
+ type.IsClass &&
+ !type.IsAbstract &&
type.ImplementedInterfaces.Contains(typeof(IDisposable)) &&
type.GetConstructor(Type.EmptyTypes) is not null)
return type;
diff --git a/src/DotPulsar/Internal/ConnectionPool.cs b/src/DotPulsar/Internal/ConnectionPool.cs
index b98a832..1eaf96f 100644
--- a/src/DotPulsar/Internal/ConnectionPool.cs
+++ b/src/DotPulsar/Internal/ConnectionPool.cs
@@ -140,7 +140,7 @@
private ValueTask<Connection> GetConnection(Uri serviceUrl, CancellationToken cancellationToken)
{
- return GetConnection(new PulsarUrl(serviceUrl,serviceUrl), cancellationToken);
+ return GetConnection(new PulsarUrl(serviceUrl, serviceUrl), cancellationToken);
}
private async ValueTask<Connection> GetConnection(PulsarUrl url, CancellationToken cancellationToken)
diff --git a/src/DotPulsar/Internal/Consumer.cs b/src/DotPulsar/Internal/Consumer.cs
index 8b1b46b..3d50ead 100644
--- a/src/DotPulsar/Internal/Consumer.cs
+++ b/src/DotPulsar/Internal/Consumer.cs
@@ -127,7 +127,7 @@
}
private async ValueTask Unsubscribe(CommandUnsubscribe command, CancellationToken cancellationToken)
- =>await _channel.Send(command, cancellationToken).ConfigureAwait(false);
+ => await _channel.Send(command, cancellationToken).ConfigureAwait(false);
public async ValueTask Seek(MessageId messageId, CancellationToken cancellationToken)
{
diff --git a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
index 9cea7cc..642a103 100644
--- a/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/CommandExtensions.cs
@@ -176,7 +176,7 @@
CommandType = BaseCommand.Type.Seek,
Seek = command
};
-
+
public static BaseCommand AsBaseCommand(this CommandRedeliverUnacknowledgedMessages command)
=> new()
{
@@ -194,7 +194,8 @@
public static BaseCommand AsBaseCommand(this CommandPartitionedTopicMetadata command)
=> new()
{
- CommandType = BaseCommand.Type.PartitionedMetadata, PartitionMetadata = command
+ CommandType = BaseCommand.Type.PartitionedMetadata,
+ PartitionMetadata = command
};
}
}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 081a818..0584b7f 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -59,7 +59,7 @@
{
if (Interlocked.Exchange(ref _isDisposed, 1) == 0)
_stream.Dispose();
-
+
return new ValueTask();
}
#else
diff --git a/src/DotPulsar/Internal/Requests/StandardRequest.cs b/src/DotPulsar/Internal/Requests/StandardRequest.cs
index 69191a2..829baff 100644
--- a/src/DotPulsar/Internal/Requests/StandardRequest.cs
+++ b/src/DotPulsar/Internal/Requests/StandardRequest.cs
@@ -41,7 +41,7 @@
=> new(requestId, consumerId, null, commandType);
public static StandardRequest WithProducerId(ulong requestId, ulong producerId, BaseCommand.Type? commandType = null)
- => new (requestId, null, producerId, commandType);
+ => new(requestId, null, producerId, commandType);
public bool SenderIsConsumer(ulong consumerId)
=> _consumerId.HasValue && _consumerId.Value == consumerId;
diff --git a/src/DotPulsar/SinglePartitionRouter.cs b/src/DotPulsar/SinglePartitionRouter.cs
index e117653..44e80eb 100644
--- a/src/DotPulsar/SinglePartitionRouter.cs
+++ b/src/DotPulsar/SinglePartitionRouter.cs
@@ -51,7 +51,7 @@
var keyBytes = messageMetadata.KeyBytes;
if (keyBytes is not null && keyBytes.Length > 0)
return (int) MurmurHash3.Hash32(keyBytes, 0) % numberOfPartitions;
-
+
if (_partitionIndex == -1)
_partitionIndex = new Random().Next(0, numberOfPartitions);
diff --git a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
index 820cecd..87daec4 100644
--- a/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
+++ b/tests/DotPulsar.IntegrationTests/Services/StandaloneExternalService.cs
@@ -19,7 +19,7 @@
public sealed class StandaloneExternalService : PulsarServiceBase
{
public override Uri GetBrokerUri()
- => new ("pulsar://localhost:6650");
+ => new("pulsar://localhost:6650");
public override Uri GetWebServiceUri()
=> new("http://localhost:8080");