| /* |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| namespace DotPulsar.Internal; |
| |
| using DotPulsar.Internal.Abstractions; |
| using DotPulsar.Internal.Exceptions; |
| using DotPulsar.Internal.Extensions; |
| using System.Buffers; |
| using System.IO.Pipelines; |
| using System.Runtime.CompilerServices; |
| |
| public sealed class PulsarStream : IPulsarStream |
| { |
| private const int FrameSizePrefix = 4; |
| private const int UnknownFrameSize = 0; |
| private const long PauseAtMoreThan10Mb = 10485760; |
| private const long ResumeAt5MbOrLess = 5242881; |
| private const int ChunkSize = 75000; |
| |
| private readonly Stream _stream; |
| private readonly ChunkingPipeline _pipeline; |
| private readonly Pipe _pipe; |
| private int _isDisposed; |
| |
| public PulsarStream(Stream stream) |
| { |
| _stream = stream; |
| _pipeline = new ChunkingPipeline(stream, ChunkSize); |
| var options = new PipeOptions(pauseWriterThreshold: PauseAtMoreThan10Mb, resumeWriterThreshold: ResumeAt5MbOrLess); |
| _pipe = new Pipe(options); |
| } |
| |
| public async Task Send(ReadOnlySequence<byte> sequence) |
| { |
| ThrowIfDisposed(); |
| await _pipeline.Send(sequence).ConfigureAwait(false); |
| } |
| |
| #if NETSTANDARD2_0 |
| public ValueTask DisposeAsync() |
| { |
| if (Interlocked.Exchange(ref _isDisposed, 1) == 0) |
| _stream.Dispose(); |
| |
| return new ValueTask(); |
| } |
| #else |
| public async ValueTask DisposeAsync() |
| { |
| if (Interlocked.Exchange(ref _isDisposed, 1) == 0) |
| await _stream.DisposeAsync().ConfigureAwait(false); |
| } |
| #endif |
| |
| private async Task FillPipe(CancellationToken cancellationToken) |
| { |
| var writer = _pipe.Writer; |
| |
| try |
| { |
| #if NETSTANDARD2_0 |
| var buffer = new byte[84999]; |
| #endif |
| while (true) |
| { |
| var memory = writer.GetMemory(84999); |
| #if NETSTANDARD2_0 |
| var bytesRead = await _stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); |
| new Memory<byte>(buffer, 0, bytesRead).CopyTo(memory); |
| #else |
| var bytesRead = await _stream.ReadAsync(memory, cancellationToken).ConfigureAwait(false); |
| #endif |
| if (bytesRead == 0) |
| break; |
| |
| writer.Advance(bytesRead); |
| |
| var result = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); |
| |
| if (result.IsCompleted) |
| break; |
| } |
| } |
| catch |
| { |
| // ignored |
| } |
| finally |
| { |
| await writer.CompleteAsync().ConfigureAwait(false); |
| } |
| } |
| |
| public async IAsyncEnumerable<ReadOnlySequence<byte>> Frames([EnumeratorCancellation] CancellationToken cancellationToken) |
| { |
| ThrowIfDisposed(); |
| |
| _ = Task.Factory.StartNew(async () => await FillPipe(cancellationToken).ConfigureAwait(false)); |
| |
| var reader = _pipe.Reader; |
| |
| try |
| { |
| var frameSize = UnknownFrameSize; |
| var totalSize = 0; |
| |
| while (true) |
| { |
| var minimumSize = FrameSizePrefix + frameSize; |
| var readResult = await reader.ReadAtLeastAsync(minimumSize, cancellationToken).ConfigureAwait(false); |
| var buffer = readResult.Buffer; |
| |
| while (true) |
| { |
| if (buffer.Length < FrameSizePrefix) |
| break; |
| |
| if (frameSize == UnknownFrameSize) |
| { |
| frameSize = (int) buffer.ReadUInt32(0, true); |
| totalSize = FrameSizePrefix + frameSize; |
| } |
| |
| if (buffer.Length < totalSize) |
| break; |
| |
| yield return buffer.Slice(FrameSizePrefix, frameSize); |
| |
| buffer = buffer.Slice(totalSize); |
| frameSize = UnknownFrameSize; |
| } |
| |
| if (readResult.IsCompleted || readResult.IsCanceled) |
| break; |
| |
| reader.AdvanceTo(buffer.Start); |
| } |
| } |
| finally |
| { |
| await reader.CompleteAsync().ConfigureAwait(false); |
| } |
| } |
| |
| private void ThrowIfDisposed() |
| { |
| if (_isDisposed != 0) |
| throw new PulsarStreamDisposedException(); |
| } |
| } |