blob: 3f02f6854cad743c9fa4ce3e5b89d6ec67697235 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
using OpenDAL.Interop.Result;
namespace OpenDAL;
/// <summary>
/// Write-only stream over an OpenDAL path.
/// </summary>
public sealed class OperatorOutputStream : Stream
{
internal const int DefaultBufferSize = 16 * 1024;
private IntPtr handle;
private bool disposed;
private readonly byte[] buffer;
private int buffered;
internal OperatorOutputStream(IntPtr handle, int bufferSize)
{
if (handle == IntPtr.Zero)
{
throw new ArgumentException("Native output stream handle must not be zero.", nameof(handle));
}
if (bufferSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(bufferSize), "Buffer size must be greater than zero.");
}
this.handle = handle;
buffer = GC.AllocateUninitializedArray<byte>(bufferSize);
}
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => !disposed;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override void Write(byte[] source, int offset, int count)
{
ArgumentNullException.ThrowIfNull(source);
Write(source.AsSpan(offset, count));
}
public override void Write(ReadOnlySpan<byte> source)
{
ThrowIfDisposed();
while (!source.IsEmpty)
{
var writable = buffer.Length - buffered;
if (writable == 0)
{
FlushBuffered();
writable = buffer.Length;
}
var take = Math.Min(writable, source.Length);
source[..take].CopyTo(buffer.AsSpan(buffered));
buffered += take;
source = source[take..];
}
}
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Write(buffer, offset, count);
return Task.CompletedTask;
}
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
Write(buffer.Span);
return ValueTask.CompletedTask;
}
public override void Flush()
{
ThrowIfDisposed();
FlushBuffered();
var result = NativeMethods.operator_output_stream_flush(handle);
Operator.ThrowIfErrorAndRelease(result);
}
public override Task FlushAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
Flush();
return Task.CompletedTask;
}
public override int Read(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotSupportedException();
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
private void FlushBuffered()
{
if (buffered == 0)
{
return;
}
var result = NativeMethods.operator_output_stream_write(handle, buffer, (nuint)buffered);
Operator.ThrowIfErrorAndRelease(result);
buffered = 0;
}
private void ThrowIfDisposed()
{
ObjectDisposedException.ThrowIf(disposed || handle == IntPtr.Zero, this);
}
protected override void Dispose(bool disposing)
{
if (!disposed)
{
OpenDALException? pending = null;
try
{
Flush();
var close = NativeMethods.operator_output_stream_close(handle);
try
{
Operator.ThrowIfErrorAndRelease(close);
}
catch (OpenDALException ex)
{
pending = ex;
}
}
finally
{
NativeMethods.operator_output_stream_free(handle);
handle = IntPtr.Zero;
buffered = 0;
disposed = true;
}
if (pending is not null)
{
throw pending;
}
}
base.Dispose(disposing);
}
}