NetworkStream doesn't buffer data, so to minimize context switches when sending on the socket, we create larger chunks from small 'Memory' segments in the sequence.
diff --git a/src/DotPulsar/Internal/ChunkingPipeline.cs b/src/DotPulsar/Internal/ChunkingPipeline.cs
new file mode 100644
index 0000000..74ecf31
--- /dev/null
+++ b/src/DotPulsar/Internal/ChunkingPipeline.cs
@@ -0,0 +1,106 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Internal
+{
+    using System;
+    using System.Buffers;
+    using System.IO;
+    using System.Threading.Tasks;
+
+    public sealed class ChunkingPipeline
+    {
+        private readonly Stream _stream;
+        private readonly int _chunkSize;
+        private readonly byte[] _buffer;
+        private int _bufferCount;
+
+        public ChunkingPipeline(Stream stream, int chunkSize)
+        {
+            _stream = stream;
+            _chunkSize = chunkSize;
+            _buffer = new byte[_chunkSize];
+        }
+
+        private void CopyToBuffer(ReadOnlySequence<byte> sequence) => sequence.CopyTo(_buffer.AsSpan());
+
+        private void CopyToBuffer(ReadOnlyMemory<byte> memory) => memory.CopyTo(_buffer.AsMemory(_bufferCount));
+
+        public async ValueTask Send(ReadOnlySequence<byte> sequence)
+        {
+            var sequenceLength = sequence.Length;
+
+            if (sequenceLength <= _chunkSize)
+            {
+                CopyToBuffer(sequence);
+                _bufferCount = (int) sequenceLength;
+                await SendBuffer().ConfigureAwait(false);
+                return;
+            }
+
+            var enumerator = sequence.GetEnumerator();
+            var hasNext = true;
+
+            while (hasNext)
+            {
+                var current = enumerator.Current;
+                var currentLength = current.Length;
+                hasNext = enumerator.MoveNext();
+
+                if (currentLength > _chunkSize)
+                {
+                    await Send(current).ConfigureAwait(false);
+                    continue;
+                }
+
+                var total = currentLength + _bufferCount;
+
+                if (total > _chunkSize)
+                    await SendBuffer().ConfigureAwait(false);
+
+                if (_bufferCount != 0 || (hasNext && enumerator.Current.Length + total <= _chunkSize))
+                {
+                    CopyToBuffer(current);
+                    _bufferCount = total;
+                    continue;
+                }
+
+                await Send(current).ConfigureAwait(false);
+            }
+
+            await SendBuffer().ConfigureAwait(false);
+        }
+
+        private async ValueTask SendBuffer()
+        {
+            if (_bufferCount != 0)
+            {
+                await _stream.WriteAsync(_buffer, 0, _bufferCount).ConfigureAwait(false);
+                _bufferCount = 0;
+            }
+        }
+
+        private async ValueTask Send(ReadOnlyMemory<byte> memory)
+        {
+            await SendBuffer().ConfigureAwait(false);
+
+#if NETSTANDARD2_0
+            var data = memory.ToArray();
+            await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
+#else
+            await _stream.WriteAsync(memory).ConfigureAwait(false);
+#endif
+        }
+    }
+}
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index 01a6062..867f50f 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -30,8 +30,10 @@
     {
         private const long _pauseAtMoreThan10Mb = 10485760;
         private const long _resumeAt5MbOrLess = 5242881;
+        private const int _chunkSize = 75000;
 
         private readonly Stream _stream;
+        private readonly ChunkingPipeline _pipeline;
         private readonly PipeReader _reader;
         private readonly PipeWriter _writer;
         private int _isDisposed;
@@ -39,6 +41,7 @@
         public PulsarStream(Stream stream)
         {
             _stream = stream;
+            _pipeline = new ChunkingPipeline(stream, _chunkSize);
             var options = new PipeOptions(pauseWriterThreshold: _pauseAtMoreThan10Mb, resumeWriterThreshold: _resumeAt5MbOrLess);
             var pipe = new Pipe(options);
             _reader = pipe.Reader;
@@ -48,19 +51,7 @@
         public async Task Send(ReadOnlySequence<byte> sequence)
         {
             ThrowIfDisposed();
-
-#if NETSTANDARD2_0
-            foreach (var segment in sequence)
-            {
-                var data = segment.ToArray();
-                await _stream.WriteAsync(data, 0, data.Length).ConfigureAwait(false);
-            }
-#else
-            foreach (var segment in sequence)
-            {
-                await _stream.WriteAsync(segment).ConfigureAwait(false);
-            }
-#endif
+            await _pipeline.Send(sequence).ConfigureAwait(false);
         }
 
 #pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
@@ -88,7 +79,7 @@
 #endif
                 while (true)
                 {
-                    var memory = _writer.GetMemory(84999); // LOH - 1 byte
+                    var memory = _writer.GetMemory(84999);
 #if NETSTANDARD2_0
                     var bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false);
                     new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory);
diff --git a/src/DotPulsar/Internal/SequenceBuilder.cs b/src/DotPulsar/Internal/SequenceBuilder.cs
index b2433ea..d7337b2 100644
--- a/src/DotPulsar/Internal/SequenceBuilder.cs
+++ b/src/DotPulsar/Internal/SequenceBuilder.cs
@@ -66,7 +66,7 @@
         {
             var node = _elements.First;
             if (node is null)
-                return new ReadOnlySequence<T>();
+                return ReadOnlySequence<T>.Empty;
 
             var current = new Segment(node.Value);
             var start = current;
diff --git a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
index 5650dd6..bdf1d1a 100644
--- a/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
+++ b/tests/DotPulsar.StressTests/DotPulsar.StressTests.csproj
@@ -6,7 +6,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.0" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
     <PackageReference Include="xunit" Version="2.4.1" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
       <PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
index a3e617c..74cb6b9 100644
--- a/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
+++ b/tests/DotPulsar.Tests/DotPulsar.Tests.csproj
@@ -7,7 +7,7 @@
 
   <ItemGroup>
     <PackageReference Include="FluentAssertions" Version="5.10.3" />
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.0" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
     <PackageReference Include="xunit" Version="2.4.1" />
     <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
       <PrivateAssets>all</PrivateAssets>
diff --git a/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
new file mode 100644
index 0000000..de3895b
--- /dev/null
+++ b/tests/DotPulsar.Tests/Internal/ChunkingPipelineTests.cs
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace DotPulsar.Tests.Internal
+{
+    using DotPulsar.Internal;
+    using FluentAssertions;
+    using System;
+    using System.Buffers;
+    using System.IO;
+    using System.Linq;
+    using System.Threading.Tasks;
+    using Xunit;
+
+    public class ChunkingPipelineTests
+    {
+        [Fact]
+        public async Task Send_GivenSequenceIsUnderChunkSize_ShouldWriteArrayOnce()
+        {
+            //Arrange
+            var a = new byte[] { 0x00, 0x01, 0x02, 0x03 };
+            var b = new byte[] { 0x04, 0x05, 0x06, 0x07 };
+            var sequence = new SequenceBuilder<byte>().Append(a).Append(b).Build();
+            var mockStream = new MockStream();
+            var sut = new ChunkingPipeline(mockStream, 9);
+
+            //Act
+            await sut.Send(sequence);
+
+            //Assert
+            var expected = sequence.ToArray();
+            var actual = mockStream.GetReadOnlySequence();
+            actual.ToArray().Should().Equal(expected);
+            actual.IsSingleSegment.Should().BeTrue();
+        }
+
+        [Theory]
+        [InlineData(4, 6, 3, 4, 6, 3)]     // No segments can be merged
+        [InlineData(1, 6, 4, 7, 4, null)]  // Can merge a and b
+        [InlineData(4, 6, 1, 4, 7, null)]  // Can merge b and c
+        public async Task Send_GivenSequenceIsOverChunkSize_ShouldWriteMultipleArrays(int length1, int length2, int length3, int expected1, int expected2, int? expected3)
+        {
+            //Arrange
+            var a = Enumerable.Range(0, length1).Select(i => (byte) i).ToArray();
+            var b = Enumerable.Range(length1, length2).Select(i => (byte) i).ToArray();
+            var c = Enumerable.Range(length1 + length2, length3).Select(i => (byte) i).ToArray();
+            var sequence = new SequenceBuilder<byte>().Append(a).Append(b).Append(c).Build();
+            var mockStream = new MockStream();
+            var sut = new ChunkingPipeline(mockStream, 8);
+
+            //Act
+            await sut.Send(sequence);
+
+            //Assert
+            var expected = sequence.ToArray();
+            var actual = mockStream.GetReadOnlySequence();
+            actual.ToArray().Should().Equal(expected);
+            GetNumberOfSegments(actual).Should().Be(expected3.HasValue ? 3 : 2);
+
+            var segmentNumber = 0;
+            foreach (var segment in actual)
+            {
+                switch (segmentNumber)
+                {
+                    case 0:
+                        segment.Length.Should().Be(expected1);
+                        break;
+                    case 1:
+                        segment.Length.Should().Be(expected2);
+                        break;
+                    case 2:
+                        expected3.Should().NotBeNull();
+                        segment.Length.Should().Be(expected3);
+                        break;
+                }
+                ++segmentNumber;
+            }
+        }
+
+        private static int GetNumberOfSegments(ReadOnlySequence<byte> sequence)
+        {
+            var numberOfSegments = 0;
+            foreach (var segment in sequence)
+                ++numberOfSegments;
+            return numberOfSegments;
+        }
+
+        private class MockStream : Stream
+        {
+            private readonly SequenceBuilder<byte> _builder;
+
+            public MockStream() => _builder = new SequenceBuilder<byte>();
+            public override bool CanRead => throw new NotImplementedException();
+            public override bool CanSeek => throw new NotImplementedException();
+            public override bool CanWrite => true;
+            public override long Length => throw new NotImplementedException();
+            public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
+            public override void Flush() => throw new NotImplementedException();
+            public override int Read(byte[] buffer, int offset, int count) => throw new NotImplementedException();
+            public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
+            public override void SetLength(long value) => throw new NotImplementedException();
+            public override void Write(byte[] buffer, int offset, int count) => _builder.Append(new ReadOnlyMemory<byte>(buffer, offset, count));
+            public ReadOnlySequence<byte> GetReadOnlySequence() => _builder.Build();
+        }
+    }
+}