blob: 17b320f869406eda2d14e358f7cb0569d240b092 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
using System;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using Apache.Arrow.Ipc;
namespace Apache.Arrow.C
{
public static class CArrowArrayStreamExporter
{
#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CArrowArrayStream*, CArrowSchema*, int> GetSchemaPtr => &GetSchema;
private static unsafe delegate* unmanaged<CArrowArrayStream*, CArrowArray*, int> GetNextPtr => &GetNext;
private static unsafe delegate* unmanaged<CArrowArrayStream*, byte*> GetLastErrorPtr => &GetLastError;
private static unsafe delegate* unmanaged<CArrowArrayStream*, void> ReleasePtr => &Release;
#else
internal unsafe delegate int GetSchemaArrayStream(CArrowArrayStream* cArrayStream, CArrowSchema* cSchema);
private static unsafe NativeDelegate<GetSchemaArrayStream> s_getSchemaArrayStream = new NativeDelegate<GetSchemaArrayStream>(GetSchema);
private static unsafe IntPtr GetSchemaPtr => s_getSchemaArrayStream.Pointer;
internal unsafe delegate int GetNextArrayStream(CArrowArrayStream* cArrayStream, CArrowArray* cArray);
private static unsafe NativeDelegate<GetNextArrayStream> s_getNextArrayStream = new NativeDelegate<GetNextArrayStream>(GetNext);
private static unsafe IntPtr GetNextPtr => s_getNextArrayStream.Pointer;
internal unsafe delegate byte* GetLastErrorArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe NativeDelegate<GetLastErrorArrayStream> s_getLastErrorArrayStream = new NativeDelegate<GetLastErrorArrayStream>(GetLastError);
private static unsafe IntPtr GetLastErrorPtr => s_getLastErrorArrayStream.Pointer;
internal unsafe delegate void ReleaseArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe NativeDelegate<ReleaseArrayStream> s_releaseArrayStream = new NativeDelegate<ReleaseArrayStream>(Release);
private static unsafe IntPtr ReleasePtr => s_releaseArrayStream.Pointer;
#endif
/// <summary>
/// Export an <see cref="IArrowArrayStream"/> to a <see cref="CArrowArrayStream"/>.
/// </summary>
/// <param name="arrayStream">The array stream to export</param>
/// <param name="cArrayStream">An allocated but uninitialized CArrowArrayStream pointer.</param>
/// <example>
/// <code>
/// CArrowArrayStream* exportPtr = CArrowArrayStream.Create();
/// CArrowArrayStreamExporter.ExportArray(arrayStream, exportPtr);
/// foreign_import_function(exportPtr);
/// </code>
/// </example>
public static unsafe void ExportArrayStream(IArrowArrayStream arrayStream, CArrowArrayStream* cArrayStream)
{
if (arrayStream == null)
{
throw new ArgumentNullException(nameof(arrayStream));
}
if (cArrayStream == null)
{
throw new ArgumentNullException(nameof(cArrayStream));
}
cArrayStream->private_data = ExportedArrayStream.Export(arrayStream);
cArrayStream->get_schema = GetSchemaPtr;
cArrayStream->get_next = GetNextPtr;
cArrayStream->get_last_error = GetLastErrorPtr;
cArrayStream->release = ReleasePtr;
}
#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static int GetSchema(CArrowArrayStream* cArrayStream, CArrowSchema* cSchema)
{
ExportedArrayStream arrayStream = null;
try
{
arrayStream = ExportedArrayStream.FromPointer(cArrayStream->private_data);
CArrowSchemaExporter.ExportSchema(arrayStream.ArrowArrayStream.Schema, cSchema);
return arrayStream.ClearError();
}
catch (Exception ex)
{
return arrayStream?.SetError(ex) ?? ExportedArrayStream.EOTHER;
}
}
#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static int GetNext(CArrowArrayStream* cArrayStream, CArrowArray* cArray)
{
ExportedArrayStream arrayStream = null;
try
{
cArray->release = default;
arrayStream = ExportedArrayStream.FromPointer(cArrayStream->private_data);
RecordBatch recordBatch = arrayStream.ArrowArrayStream.ReadNextRecordBatchAsync().Result;
if (recordBatch != null)
{
CArrowArrayExporter.ExportRecordBatch(recordBatch, cArray);
}
return arrayStream.ClearError();
}
catch (Exception ex)
{
return arrayStream?.SetError(ex) ?? ExportedArrayStream.EOTHER;
}
}
#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static byte* GetLastError(CArrowArrayStream* cArrayStream)
{
try
{
ExportedArrayStream arrayStream = ExportedArrayStream.FromPointer(cArrayStream->private_data);
return arrayStream.LastError;
}
catch (Exception)
{
return null;
}
}
#if NET5_0_OR_GREATER
[UnmanagedCallersOnly]
#endif
private unsafe static void Release(CArrowArrayStream* cArrayStream)
{
ExportedArrayStream.Free(&cArrayStream->private_data);
cArrayStream->release = default;
}
sealed unsafe class ExportedArrayStream : IDisposable
{
public const int EOTHER = 131;
ExportedArrayStream(IArrowArrayStream arrayStream)
{
ArrowArrayStream = arrayStream;
LastError = null;
}
public IArrowArrayStream ArrowArrayStream { get; private set; }
public byte* LastError { get; private set; }
public static void* Export(IArrowArrayStream arrayStream)
{
ExportedArrayStream result = new ExportedArrayStream(arrayStream);
GCHandle gch = GCHandle.Alloc(result);
return (void*)GCHandle.ToIntPtr(gch);
}
public static void Free(void** ptr)
{
GCHandle gch = GCHandle.FromIntPtr((IntPtr)(*ptr));
if (!gch.IsAllocated)
{
return;
}
((ExportedArrayStream)gch.Target).Dispose();
gch.Free();
*ptr = null;
}
public static ExportedArrayStream FromPointer(void* ptr)
{
GCHandle gch = GCHandle.FromIntPtr((IntPtr)ptr);
return (ExportedArrayStream)gch.Target;
}
public int SetError(Exception ex)
{
ReleaseLastError();
LastError = StringUtil.ToCStringUtf8(ex.Message);
return EOTHER;
}
public int ClearError()
{
ReleaseLastError();
return 0;
}
public void Dispose()
{
ReleaseLastError();
ArrowArrayStream?.Dispose();
ArrowArrayStream = null;
}
void ReleaseLastError()
{
if (LastError != null)
{
Marshal.FreeHGlobal((IntPtr)LastError);
LastError = null;
}
}
}
}
}