Fixing two bugs.
Make ready for 1.0.2
diff --git a/CHANGELOG.md b/CHANGELOG.md
index d5e998e..eada720 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@
 
 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
 
+## [1.0.2] - 2021-04-30
+
+### Fixed
+
+- Closing a consumer or reader while the broker is streaming messages could take down the connection causing other consumers, readers, and producers of the connection to reconnect
+- In some circumstances, the protocol bytes could be misread leading to wrong messages parsing
+
 ## [1.0.1] - 2021-03-30
 
 ### Fixed
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 8f6cd7e..fc72713 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
 
   <PropertyGroup>
     <TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0</TargetFrameworks>
-    <Version>1.0.1</Version>
+    <Version>1.0.2</Version>
     <AssemblyVersion>$(Version)</AssemblyVersion>
     <FileVersion>$(Version)</FileVersion>
     <Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
diff --git a/src/DotPulsar/Internal/Channel.cs b/src/DotPulsar/Internal/Channel.cs
index b441680..eb67755 100644
--- a/src/DotPulsar/Internal/Channel.cs
+++ b/src/DotPulsar/Internal/Channel.cs
@@ -35,7 +35,16 @@
         }
 
         public void Received(MessagePackage message)
-            => _enqueue.Enqueue(message);
+        {
+            try
+            {
+                _enqueue.Enqueue(message);
+            }
+            catch
+            {
+                // Ignore
+            }
+        }
 
         public void Activated()
             => _eventRegister.Register(new ChannelActivated(_correlationId));
diff --git a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
index ea5fa12..21994fb 100644
--- a/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
+++ b/src/DotPulsar/Internal/Extensions/ReadOnlySequenceExtensions.cs
@@ -88,7 +88,7 @@
                     }
                 }
 
-                if (read == 3)
+                if (read == 4)
                     break;
 
                 start = 0;
diff --git a/src/DotPulsar/Internal/Producer.cs b/src/DotPulsar/Internal/Producer.cs
index 5264b25..b130318 100644
--- a/src/DotPulsar/Internal/Producer.cs
+++ b/src/DotPulsar/Internal/Producer.cs
@@ -91,6 +91,7 @@
             await _channel.ClosedByClient(CancellationToken.None).ConfigureAwait(false);
             await _channel.DisposeAsync().ConfigureAwait(false);
         }
+
         public async ValueTask<MessageId> Send(TMessage message, CancellationToken cancellationToken)
             => await Send(_schema.Encode(message), cancellationToken).ConfigureAwait(false);
 
diff --git a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
index b84268c..c4d9df6 100644
--- a/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
+++ b/tests/DotPulsar.Tests/Internal/Extensions/ReadOnlySequenceExtensionsTests.cs
@@ -127,34 +127,57 @@
             actual.Should().Be(expected);
         }
 
-        [Fact]
-        public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult()
+#pragma warning disable xUnit1025 // Miscoded warning can't tell these are all different
+        [Theory]
+        [InlineData(new byte[] { }, new byte[] { 0x02, 0x03, 0x04, 0x05 })]
+        [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04, 0x05 })]
+        [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { 0x04, 0x05 })]
+        [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05 })]
+        [InlineData(new byte[] { 0x02, 0x03, 0x04, 0x05 }, new byte[] { })]
+        [InlineData(new byte[] { 0x02 }, new byte[] { }, new byte[] { 0x03, 0x04, 0x05 })]
+        [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { }, new byte[] { 0x04, 0x05 })]
+        [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { }, new byte[] { 0x05 })]
+        [InlineData(new byte[] { 0x02, 0x03 }, new byte[] { 0x04 }, new byte[] { 0x05 })]
+        [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04 }, new byte[] { 0x05 })]
+        [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04, 0x05 })]
+        [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04 }, new byte[] { 0x05 })]
+        [InlineData(new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05, 0x09 })]
+        [InlineData(new byte[] { 0x02 }, new byte[] { 0x03, 0x04, 0x05 }, new byte[] { 0x09 })]
+        [InlineData(new byte[] { 0x02 }, new byte[] { 0x03 }, new byte[] { 0x04, 0x05 }, new byte[] { 0x09 })]
+#pragma warning restore xUnit1025 // InlineData should be unique within the Theory it belongs to
+        public void ReadUInt32_GivenSequenceWithMultipleSegments_ShouldGiveExceptedResult(params byte[][] testPath)
         {
             //Arrange
-            var sequence = new SequenceBuilder<byte>().Append(new byte[] { 0x00, 0x01 }).Append(new byte[] { 0x02, 0x03 }).Build();
+            var sequenceBuilder = new SequenceBuilder<byte>();
+            foreach (var array in testPath)
+                sequenceBuilder.Append(array);
+            var sequence = sequenceBuilder.Build();
 
             //Act
             var actual = sequence.ReadUInt32(0, true);
 
             //Assert
-            const uint expected = 66051;
+            const uint expected = 0x02030405;
             actual.Should().Be(expected);
         }
 
-        [Fact]
-        public void ReadUInt32_GivenSequenceWithMultipleSegmentsAndNonZeroStart_ShouldGiveExceptedResult()
+        [Theory]
+        [InlineData(2, new byte[] { 0x09, 0x09, 0x02 }, new byte[] { 0x03, 0x04, 0x05 }, new byte[] { 0x09, 0x09, 0x09 })]
+        [InlineData(3, new byte[] { 0x09, 0x09, 0x09 }, new byte[] { 0x02, 0x03, 0x04 }, new byte[] { 0x05, 0x09, 0x09 })]
+        [InlineData(4, new byte[] { 0x09, 0x09, 0x09 }, new byte[] { 0x09, 0x02, 0x03 }, new byte[] { 0x04, 0x05, 0x09 })]
+        public void ReadUInt32_GivenSequenceWithMultipleSegmentsAndNonZeroStart_ShouldGiveExceptedResult(long start, params byte[][] testPath)
         {
             //Arrange
-            var sequence = new SequenceBuilder<byte>()
-                .Append(new byte[] { 0x09, 0x09, 0x09 })
-                .Append(new byte[] { 0x09, 0x00, 0x01 })
-                .Append(new byte[] { 0x02, 0x03 }).Build();
+            var sequenceBuilder = new SequenceBuilder<byte>();
+            foreach (var array in testPath)
+                sequenceBuilder.Append(array);
+            var sequence = sequenceBuilder.Build();
 
             //Act
-            var actual = sequence.ReadUInt32(4, true);
+            var actual = sequence.ReadUInt32(start, true);
 
             //Assert
-            const uint expected = 66051;
+            const uint expected = 0x02030405;
             actual.Should().Be(expected);
         }
     }