Auto clean up
diff --git a/src/DotPulsar.Stress.Tests/ConsumerTests.cs b/src/DotPulsar.Stress.Tests/ConsumerTests.cs
index cdfe6ab..2517413 100644
--- a/src/DotPulsar.Stress.Tests/ConsumerTests.cs
+++ b/src/DotPulsar.Stress.Tests/ConsumerTests.cs
@@ -12,14 +12,14 @@
* limitations under the License.
*/
+using DotPulsar.Extensions;
+using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
-using DotPulsar.Extensions;
-using FluentAssertions;
using Xunit;
using Xunit.Abstractions;
diff --git a/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs b/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
index 95a9fd8..d8fdde3 100644
--- a/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
+++ b/src/DotPulsar.Stress.Tests/XunitExceptionHandler.cs
@@ -12,10 +12,10 @@
* limitations under the License.
*/
-using System;
-using System.Threading.Tasks;
using DotPulsar.Abstractions;
using DotPulsar.Internal;
+using System;
+using System.Threading.Tasks;
using Xunit.Abstractions;
namespace DotPulsar.Stress.Tests
diff --git a/src/DotPulsar/Internal/ChannelManager.cs b/src/DotPulsar/Internal/ChannelManager.cs
index 48c6048..d03e99d 100644
--- a/src/DotPulsar/Internal/ChannelManager.cs
+++ b/src/DotPulsar/Internal/ChannelManager.cs
@@ -142,7 +142,7 @@
if (consumer != null)
consumer.Received(new MessagePackage(command.MessageId, data));
}
-
+
public void Dispose()
{
foreach (var channel in _consumerChannels.RemoveAll())
diff --git a/src/DotPulsar/Internal/ConsumerChannel.cs b/src/DotPulsar/Internal/ConsumerChannel.cs
index d4ab7cc..9373da1 100644
--- a/src/DotPulsar/Internal/ConsumerChannel.cs
+++ b/src/DotPulsar/Internal/ConsumerChannel.cs
@@ -69,9 +69,9 @@
}
var metadataSize = messagePackage.GetMetadataSize();
- var data = messagePackage.ExtractData(metadataSize);
- var metadata = messagePackage.ExtractMetadata(metadataSize);
- var messageId = messagePackage.MessageId;
+ var data = messagePackage.ExtractData(metadataSize);
+ var metadata = messagePackage.ExtractMetadata(metadataSize);
+ var messageId = messagePackage.MessageId;
return metadata.NumMessagesInBatch == 1
? new Message(new MessageId(messageId), metadata, null, data)
@@ -148,11 +148,11 @@
private async Task RejectPackage(MessagePackage messagePackage)
{
- var ack = new CommandAck
- {
- Type = CommandAck.AckType.Individual,
- validation_error = CommandAck.ValidationError.ChecksumMismatch
- };
+ var ack = new CommandAck
+ {
+ Type = CommandAck.AckType.Individual,
+ validation_error = CommandAck.ValidationError.ChecksumMismatch
+ };
ack.MessageIds.Add(messagePackage.MessageId);
diff --git a/src/DotPulsar/Internal/Executor.cs b/src/DotPulsar/Internal/Executor.cs
index e5e82c2..1260cf3 100644
--- a/src/DotPulsar/Internal/Executor.cs
+++ b/src/DotPulsar/Internal/Executor.cs
@@ -151,7 +151,7 @@
return true;
var context = new ExceptionContext(exception, cancellationToken);
-
+
await _exceptionHandler.OnException(context);
if (context.Result != FaultAction.Retry)
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 91fa354..f5750f1 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -62,7 +62,7 @@
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
-
+
_eventRegister.Register(new ProducerDisposed(_correlationId, this));
await _channel.DisposeAsync();
}
diff --git a/src/DotPulsar/Internal/ProducerProcess.cs b/src/DotPulsar/Internal/ProducerProcess.cs
index 2338326..ff50d83 100644
--- a/src/DotPulsar/Internal/ProducerProcess.cs
+++ b/src/DotPulsar/Internal/ProducerProcess.cs
@@ -47,7 +47,7 @@
if (_producer.IsFinalState())
return;
- if(ExecutorState == ExecutorState.Faulted)
+ if (ExecutorState == ExecutorState.Faulted)
{
_stateManager.SetState(ProducerState.Faulted);
return;
diff --git a/src/DotPulsar/Internal/Reader.cs b/src/DotPulsar/Internal/Reader.cs
index 302cfa2..f444e3b 100644
--- a/src/DotPulsar/Internal/Reader.cs
+++ b/src/DotPulsar/Internal/Reader.cs
@@ -73,7 +73,7 @@
{
if (Interlocked.Exchange(ref _isDisposed, 1) != 0)
return;
-
+
_eventRegister.Register(new ReaderDisposed(_correlationId, this));
await _channel.DisposeAsync();
}