Fixed pipeline in order to receive messages larger than 32kb
diff --git a/src/DotPulsar/Internal/PulsarStream.cs b/src/DotPulsar/Internal/PulsarStream.cs
index ed18b48..041b3af 100644
--- a/src/DotPulsar/Internal/PulsarStream.cs
+++ b/src/DotPulsar/Internal/PulsarStream.cs
@@ -10,6 +10,9 @@
{
public sealed class PulsarStream : IDisposable
{
+ const long PauseAtMoreThan10Mb = 10485760;
+ const long ResumeAt5MbOrLess = 5242881;
+
private readonly Stream _stream;
private readonly Action<uint, ReadOnlySequence<byte>> _handler;
private readonly CancellationTokenSource _tokenSource;
@@ -19,7 +22,8 @@
_stream = stream;
_handler = handler;
_tokenSource = new CancellationTokenSource();
- var pipe = new Pipe();
+ var options = new PipeOptions(pauseWriterThreshold: PauseAtMoreThan10Mb, resumeWriterThreshold: ResumeAt5MbOrLess);
+ var pipe = new Pipe(options);
var fill = FillPipe(_stream, pipe.Writer, _tokenSource.Token);
var read = ReadPipe(pipe.Reader, _tokenSource.Token);
IsClosed = Task.WhenAny(fill, read);