Fixing two bugs.
Make ready for 1.0.2
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d5e998e..eada720 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [1.0.2] - 2021-04-30
+
+### Fixed
+
+- Closing a consumer or reader while the broker is streaming messages could take down the connection causing other consumers, readers, and producers of the connection to reconnect
+- In some circumstances, the protocol bytes could be misread leading to wrong messages parsing
+
## [1.0.1] - 2021-03-30
### Fixed
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 8f6cd7e..fc72713 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
- <Version>1.0.1</Version>
+ <Version>1.0.2</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/src/DotPulsar/Internal/Channel.cs b/src/DotPulsar/Internal/Channel.cs
index b441680..eb67755 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -35,7 +35,16 @@
}
public void Received(MessagePackage message)
- => _enqueue.Enqueue(message);
+ {
+ try
+ {
+ _enqueue.Enqueue(message);
+ }
+ catch
+ {
+ // Ignore
+ }
+ }
public void Activated()
=> _eventRegister.Register(new ChannelActivated(_correlationId));
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
index ea5fa12..21994fb 100644
--- a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -88,7 +88,7 @@
}
}
- if (read == 3)
+ if (read == 4)
break;
start = 0;
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 5264b25..b130318 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -91,6 +91,7 @@
await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
await _channel.DisposeAsync().ConfigureAwait(false);
}
+
public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
=> await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
diff --git a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
index b84268c..c4d9df6 100644
--- a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
@@ -127,34 +127,57 @@
actual.Should().Be(expected);
}
- [Fact]
- public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult()
+#pragma warning disable xUnit1025 // Miscoded warning can't tell these are all different
+ [Theory]
+ [InlineData(new byte[] { }, new byte[] { 0x02, 0x03, 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03, 0x04, 0x05 }, new byte[] { })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { }, new byte[] { 0x03, 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { }, new byte[] { 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { 0x04 }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04 }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04, 0x05 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04 }, new byte[] { 0x05 })]
+ [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05, 0x09 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04, 0x05 }, new byte[] { 0x09 })]
+ [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04, 0x05 }, new byte[] { 0x09 })]
+#pragma warning restore xUnit1025 // InlineData should be unique within the Theory it belongs to
+ public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult(params byte[][] testPath)
{
//Arrange
- var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build();
+ var sequenceBuilder = new SequenceBuilder<byte>();
+ foreach (var array in testPath)
+ sequenceBuilder.Append(array);
+ var sequence = sequenceBuilder.Build();
//Act
var actual = sequence.ReadUInt32(0, true);
//Assert
- const uint expected = 66051;
+ const uint expected = 0x02030405;
actual.Should().Be(expected);
}
- [Fact]
- public void ReadUInt32_GivenSequenceWithMultipleSegmentsAndNonZeroStart_ShouldGiveExceptedResult()
+ [Theory]
+ [InlineData(2, new byte[] { 0x09, 0x09, 0x02 }, new byte[] { 0x03, 0x04, 0x05 }, new byte[] { 0x09, 0x09, 0x09 })]
+ [InlineData(3, new byte[] { 0x09, 0x09, 0x09 }, new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05, 0x09, 0x09 })]
+ [InlineData(4, new byte[] { 0x09, 0x09, 0x09 }, new byte[] { 0x09, 0x02, 0x03 }, new byte[] { 0x04, 0x05, 0x09 })]
+ public void ReadUInt32_GivenSequenceWithMultipleSegmentsAndNonZeroStart_ShouldGiveExceptedResult(long start, params byte[][] testPath)
{
//Arrange
- var sequence = new SequenceBuilder<byte>()
- .Append(new byte[] { 0x09, 0x09, 0x09 })
- .Append(new byte[] { 0x09, 0x00, 0x01 })
- .Append(new byte[] { 0x02, 0x03 }).Build();
+ var sequenceBuilder = new SequenceBuilder<byte>();
+ foreach (var array in testPath)
+ sequenceBuilder.Append(array);
+ var sequence = sequenceBuilder.Build();
//Act
- var actual = sequence.ReadUInt32(4, true);
+ var actual = sequence.ReadUInt32(start, true);
//Assert
- const uint expected = 66051;
+ const uint expected = 0x02030405;
actual.Should().Be(expected);
}
}