NetworkStream doesn't buffer data, so to minimize context switches when sending on the socket, we create larger chunks from small 'Memory' segments in the sequence.
diff --git a/src/DotPulsar/Internal/ChunkingPipeline.cs b/src/DotPulsar/Internal/ChunkingPipeline.cs
new file mode 100644
index 0000000..74ecf31
--- /dev/null
+++ b/src/DotPulsar/Internal/ChunkingPipeline.cs
@@ -0,0 +1,106 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+ using System;
+ using System.Buffers;
+ using System.IO;
+ using System.Threading.Tasks;
+
+ public sealed class ChunkingPipeline
+ {
+ private readonly Stream _stream;
+ private readonly int _chunkSize;
+ private readonly byte[] _buffer;
+ private int _bufferCount;
+
+ public ChunkingPipeline(Stream stream, int chunkSize)
+ {
+ _stream = stream;
+ _chunkSize = chunkSize;
+ _buffer = new byte[_chunkSize];
+ }
+
+ private void CopyToBuffer(ReadOnlySequence<byte> sequence) => sequence.CopyTo(_buffer.AsSpan());
+
+ private void CopyToBuffer(ReadOnlyMemory<byte> memory) => memory.CopyTo(_buffer.AsMemory(_bufferCount));
+
+ public async ValueTask Send(ReadOnlySequence<byte> sequence)
+ {
+ var sequenceLength = sequence.Length;
+
+ if (sequenceLength <= _chunkSize)
+ {
+ CopyToBuffer(sequence);
+ _bufferCount = (int) sequenceLength;
+ await SendBuffer().ConfigureAwait(false);
+ return;
+ }
+
+ var enumerator = sequence.GetEnumerator();
+ var hasNext = true;
+
+ while (hasNext)
+ {
+ var current = enumerator.Current;
+ var currentLength = current.Length;
+ hasNext = enumerator.MoveNext();
+
+ if (currentLength > _chunkSize)
+ {
+ await Send(current).ConfigureAwait(false);
+ continue;
+ }
+
+ var total = currentLength + _bufferCount;
+
+ if (total > _chunkSize)
+ await SendBuffer().ConfigureAwait(false);
+
+ if (_bufferCount != 0 || (hasNext && enumerator.Current.Length + total <= _chunkSize))
+ {
+ CopyToBuffer(current);
+ _bufferCount = total;
+ continue;
+ }
+
+ await Send(current).ConfigureAwait(false);
+ }
+
+ await SendBuffer().ConfigureAwait(false);
+ }
+
+ private async ValueTask SendBuffer()
+ {
+ if (_bufferCount != 0)
+ {
+ await _stream.WriteAsync(_buffer, 0, _bufferCount).ConfigureAwait(false);
+ _bufferCount = 0;
+ }
+ }
+
+ private async ValueTask Send(ReadOnlyMemory<byte> memory)
+ {
+ await SendBuffer().ConfigureAwait(false);
+
+#if NETSTANDARD2_0
+ var data = memory.ToArray();
+ await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
+#else
+ await _stream.WriteAsync(memory).ConfigureAwait(false);
+#endif
+ }
+ }
+}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 01a6062..867f50f 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -30,8 +30,10 @@
{
private const long _pauseAtMoreThan10Mb = 10485760;
private const long _resumeAt5MbOrLess = 5242881;
+ private const int _chunkSize = 75000;
private readonly Stream _stream;
+ private readonly ChunkingPipeline _pipeline;
private readonly PipeReader _reader;
private readonly PipeWriter _writer;
private int _isDisposed;
@@ -39,6 +41,7 @@
public PulsarStream(Stream stream)
{
_stream = stream;
+ _pipeline = new ChunkingPipeline(stream, _chunkSize);
var options = new PipeOptions(pauseWriterThreshold: _pauseAtMoreThan10Mb, resumeWriterThreshold: _resumeAt5MbOrLess);
var pipe = new Pipe(options);
_reader = pipe.Reader;
@@ -48,19 +51,7 @@
public async Task Send(ReadOnlySequence<byte> sequence)
{
ThrowIfDisposed();
-
-#if NETSTANDARD2_0
- foreach (var segment in sequence)
- {
- var data = segment.ToArray();
- await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
- }
-#else
- foreach (var segment in sequence)
- {
- await _stream.WriteAsync(segment).ConfigureAwait(false);
- }
-#endif
+ await _pipeline.Send(sequence).ConfigureAwait(false);
}
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
@@ -88,7 +79,7 @@
#endif
while (true)
{
- var memory = _writer.GetMemory(84999); // LOH - 1 byte
+ var memory = _writer.GetMemory(84999);
#if NETSTANDARD2_0
var bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
diff --git a/src/DotPulsar/Internal/SequenceBuilder.cs b/src/DotPulsar/Internal/SequenceBuilder.cs
index b2433ea..d7337b2 100644
--- a/src/DotPulsar/Internal/SequenceBuilder.cs
+++ b/src/DotPulsar/Internal/SequenceBuilder.cs
@@ -66,7 +66,7 @@
{
var node = _elements.First;
if (node is null)
- return new ReadOnlySequence<T>();
+ return ReadOnlySequence<T>.Empty;
var current = new Segment(node.Value);
var start = current;
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 5650dd6..bdf1d1a 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -6,7 +6,7 @@
</PropertyGroup>
<ItemGroup>
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.0" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index a3e617c..74cb6b9 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -7,7 +7,7 @@
<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.10.3" />
- <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.0" />
+ <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
new file mode 100644
index 0000000..de3895b
--- /dev/null
+++ b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+ using DotPulsar.Internal;
+ using FluentAssertions;
+ using System;
+ using System.Buffers;
+ using System.IO;
+ using System.Linq;
+ using System.Threading.Tasks;
+ using Xunit;
+
+ public class ChunkingPipelineTests
+ {
+ [Fact]
+ public async Task Send_GivenSequenceIsUnderChunkSize_ShouldWriteArrayOnce()
+ {
+ //Arrange
+ var a = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+ var b = new byte[] { 0x04, 0x05, 0x06, 0x07 };
+ var sequence = new SequenceBuilder<byte>().Append(a).Append(b).Build();
+ var mockStream = new MockStream();
+ var sut = new ChunkingPipeline(mockStream, 9);
+
+ //Act
+ await sut.Send(sequence);
+
+ //Assert
+ var expected = sequence.ToArray();
+ var actual = mockStream.GetReadOnlySequence();
+ actual.ToArray().Should().Equal(expected);
+ actual.IsSingleSegment.Should().BeTrue();
+ }
+
+ [Theory]
+ [InlineData(4, 6, 3, 4, 6, 3)] // No segments can be merged
+ [InlineData(1, 6, 4, 7, 4, null)] // Can merge a and b
+ [InlineData(4, 6, 1, 4, 7, null)] // Can merge b and c
+ public async Task Send_GivenSequenceIsOverChunkSize_ShouldWriteMultipleArrays(int length1, int length2, int length3, int expected1, int expected2, int? expected3)
+ {
+ //Arrange
+ var a = Enumerable.Range(0, length1).Select(i => (byte) i).ToArray();
+ var b = Enumerable.Range(length1, length2).Select(i => (byte) i).ToArray();
+ var c = Enumerable.Range(length1 + length2, length3).Select(i => (byte) i).ToArray();
+ var sequence = new SequenceBuilder<byte>().Append(a).Append(b).Append(c).Build();
+ var mockStream = new MockStream();
+ var sut = new ChunkingPipeline(mockStream, 8);
+
+ //Act
+ await sut.Send(sequence);
+
+ //Assert
+ var expected = sequence.ToArray();
+ var actual = mockStream.GetReadOnlySequence();
+ actual.ToArray().Should().Equal(expected);
+ GetNumberOfSegments(actual).Should().Be(expected3.HasValue ? 3 : 2);
+
+ var segmentNumber = 0;
+ foreach (var segment in actual)
+ {
+ switch (segmentNumber)
+ {
+ case 0:
+ segment.Length.Should().Be(expected1);
+ break;
+ case 1:
+ segment.Length.Should().Be(expected2);
+ break;
+ case 2:
+ expected3.Should().NotBeNull();
+ segment.Length.Should().Be(expected3);
+ break;
+ }
+ ++segmentNumber;
+ }
+ }
+
+ private static int GetNumberOfSegments(ReadOnlySequence<byte> sequence)
+ {
+ var numberOfSegments = 0;
+ foreach (var segment in sequence)
+ ++numberOfSegments;
+ return numberOfSegments;
+ }
+
+ private class MockStream : Stream
+ {
+ private readonly SequenceBuilder<byte> _builder;
+
+ public MockStream() => _builder = new SequenceBuilder<byte>();
+ public override bool CanRead => throw new NotImplementedException();
+ public override bool CanSeek => throw new NotImplementedException();
+ public override bool CanWrite => true;
+ public override long Length => throw new NotImplementedException();
+ public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
+ public override void Flush() => throw new NotImplementedException();
+ public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
+ public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
+ public override void SetLength(long value) => throw new NotImplementedException();
+ public override void Write(byte[] buffer, int offset, int count) => _builder.Append(new ReadOnlyMemory<byte>(buffer, offset, count));
+ public ReadOnlySequence<byte> GetReadOnlySequence() => _builder.Build();
+ }
+ }
+}