Make ready for release 2.3.1
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cb34e7b..a6f89d8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,12 @@
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [2.3.1] - 2022-04-20
+
+### Changed
+
+- Performance improvements, especially when consuming/reading
+
## [2.3.0] - 2022-03-18
### Added
diff --git a/src/DotPulsar/DotPulsar.csproj b/src/DotPulsar/DotPulsar.csproj
index 2a80124..ff7cd51 100644
--- a/src/DotPulsar/DotPulsar.csproj
+++ b/src/DotPulsar/DotPulsar.csproj
@@ -2,7 +2,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;netstandard2.1;netcoreapp3.1;net5.0;net6.0</TargetFrameworks>
- <Version>2.3.0</Version>
+ <Version>2.3.1</Version>
<AssemblyVersion>$(Version)</AssemblyVersion>
<FileVersion>$(Version)</FileVersion>
<Authors>ApachePulsar,DanskeCommodities,dblank</Authors>
@@ -23,7 +23,7 @@
<ItemGroup>
<PackageReference Include="HashDepot" Version="2.0.3" />
- <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.3" />
+ <PackageReference Include="Microsoft.Extensions.ObjectPool" Version="6.0.4" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="protobuf-net" Version="3.0.101" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.2" />
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 7c6fbe0..a57e667 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -28,6 +28,8 @@
public sealed class PulsarStream : IPulsarStream
{
+ private const int _frameSizePrefix = 4;
+ private const int _unknownFrameSize = 0;
private const long _pauseAtMoreThan10Mb = 10485760;
private const long _resumeAt5MbOrLess = 5242881;
private const int _chunkSize = 75000;
@@ -117,31 +119,32 @@
try
{
- uint? frameSize = null;
+ var frameSize = _unknownFrameSize;
+
while (true)
{
- var minimumSize = frameSize.HasValue ? (int) frameSize.Value + 4 : 4;
- var result = await _reader.ReadAtLeastAsync(minimumSize, cancellationToken).ConfigureAwait(false);
- var buffer = result.Buffer;
+ var minimumSize = _frameSizePrefix + frameSize;
+ var readResult = await _reader.ReadAtLeastAsync(minimumSize, cancellationToken).ConfigureAwait(false);
+ var buffer = readResult.Buffer;
while (true)
{
- if (buffer.Length < 4)
+ if (buffer.Length < _frameSizePrefix)
break;
- frameSize ??= buffer.ReadUInt32(0, true);
- var totalSize = frameSize.Value + 4;
+ frameSize = (int) buffer.ReadUInt32(0, true);
+ var totalSize = _frameSizePrefix + frameSize;
if (buffer.Length < totalSize)
break;
- yield return buffer.Slice(4, frameSize.Value);
+ yield return buffer.Slice(_frameSizePrefix, frameSize);
buffer = buffer.Slice(totalSize);
- frameSize = null;
+ frameSize = _unknownFrameSize;
}
- if (result.IsCompleted)
+ if (readResult.IsCompleted)
break;
_reader.AdvanceTo(buffer.Start);