blob: 6be45d2fc04d8c770d9e949ab6e19a2abb387510 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using OpenDAL.Interop.Result;
namespace OpenDAL;
/// <summary>
/// Read-only stream over an OpenDAL path.
/// </summary>
public sealed class OperatorInputStream : Stream
{
private IntPtr handle;
private bool disposed;
private byte[]? chunk;
private int chunkOffset;
internal OperatorInputStream(IntPtr handle)
{
if (handle == IntPtr.Zero)
{
throw new ArgumentException("Native input stream handle must not be zero.", nameof(handle));
}
this.handle = handle;
}
public override bool CanRead => !disposed;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
ArgumentNullException.ThrowIfNull(buffer);
return Read(buffer.AsSpan(offset, count));
}
public override int Read(Span<byte> destination)
{
ThrowIfDisposed();
if (destination.Length == 0)
{
return 0;
}
var totalRead = 0;
while (destination.Length > 0)
{
if (chunk is null || chunkOffset >= chunk.Length)
{
var next = NativeMethods.operator_input_stream_read_next(handle);
chunk = Operator.ToValueOrThrowAndRelease<byte[], OpenDALReadResult>(next);
chunkOffset = 0;
if (chunk.Length == 0)
{
return totalRead;
}
}
var available = chunk.Length - chunkOffset;
var toCopy = Math.Min(available, destination.Length);
chunk.AsSpan(chunkOffset, toCopy).CopyTo(destination);
chunkOffset += toCopy;
destination = destination[toCopy..];
totalRead += toCopy;
}
return totalRead;
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return Task.FromResult(Read(buffer, offset, count));
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
return ValueTask.FromResult(Read(buffer.Span));
}
public override void Flush()
{
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
return Task.CompletedTask;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(disposed || handle == IntPtr.Zero, this);
}
protected override void Dispose(bool disposing)
{
if (!disposed)
{
NativeMethods.operator_input_stream_free(handle);
handle = IntPtr.Zero;
chunk = null;
chunkOffset = 0;
disposed = true;
}
base.Dispose(disposing);
}
}