Adding catches to ignore exceptions from the "internal" tasks.
diff --git a/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj b/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
index 5270c0b..d04a824 100644
--- a/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
+++ b/src/DotPulsar.Stress.Tests/DotPulsar.Stress.Tests.csproj
@@ -6,11 +6,17 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="FluentAssertions" Version="5.10.2" />
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
<PackageReference Include="xunit" Version="2.4.1" />
- <PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
- <PackageReference Include="coverlet.collector" Version="1.0.1" />
+ <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ <PackageReference Include="coverlet.collector" Version="1.2.0">
+ <PrivateAssets>all</PrivateAssets>
+ <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
+ </PackageReference>
+ <PackageReference Include="FluentAssertions" Version="5.10.2" />
</ItemGroup>
<ItemGroup>
diff --git a/src/DotPulsar/Internal/Connection.cs b/src/DotPulsar/Internal/Connection.cs
index ecf73a1..b14770a 100644
--- a/src/DotPulsar/Internal/Connection.cs
+++ b/src/DotPulsar/Internal/Connection.cs
@@ -172,36 +172,40 @@
{
await Task.Yield();
- await foreach (var frame in _stream.Frames())
+ try
{
- var commandSize = frame.ReadUInt32(0, true);
- var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
-
- switch (command.CommandType)
+ await foreach (var frame in _stream.Frames())
{
- case BaseCommand.Type.Message:
- _channelManager.Incoming(command.Message, frame.Slice(commandSize + 4));
- break;
- case BaseCommand.Type.CloseConsumer:
- _channelManager.Incoming(command.CloseConsumer);
- break;
- case BaseCommand.Type.ActiveConsumerChange:
- _channelManager.Incoming(command.ActiveConsumerChange);
- break;
- case BaseCommand.Type.ReachedEndOfTopic:
- _channelManager.Incoming(command.ReachedEndOfTopic);
- break;
- case BaseCommand.Type.CloseProducer:
- _channelManager.Incoming(command.CloseProducer);
- break;
- case BaseCommand.Type.Ping:
- _pingPongHandler.Incoming(command.Ping);
- break;
- default:
- _requestResponseHandler.Incoming(command);
- break;
+ var commandSize = frame.ReadUInt32(0, true);
+ var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize));
+
+ switch (command.CommandType)
+ {
+ case BaseCommand.Type.Message:
+ _channelManager.Incoming(command.Message, frame.Slice(commandSize + 4));
+ break;
+ case BaseCommand.Type.CloseConsumer:
+ _channelManager.Incoming(command.CloseConsumer);
+ break;
+ case BaseCommand.Type.ActiveConsumerChange:
+ _channelManager.Incoming(command.ActiveConsumerChange);
+ break;
+ case BaseCommand.Type.ReachedEndOfTopic:
+ _channelManager.Incoming(command.ReachedEndOfTopic);
+ break;
+ case BaseCommand.Type.CloseProducer:
+ _channelManager.Incoming(command.CloseProducer);
+ break;
+ case BaseCommand.Type.Ping:
+ _pingPongHandler.Incoming(command.Ping);
+ break;
+ default:
+ _requestResponseHandler.Incoming(command);
+ break;
+ }
}
}
+ catch { }
}
public async ValueTask DisposeAsync()
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 9ccfbc0..4fafbf2 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -104,6 +104,7 @@
break;
}
}
+ catch { }
finally
{
_writer.Complete();